stable-baselines3-contrib-sacd/tests/test_cnn.py

215 lines
7.5 KiB
Python

import os
import random
from copy import deepcopy
import numpy as np
import pytest
import torch as th
from stable_baselines3.common.envs import FakeImageEnv
from stable_baselines3.common.utils import zip_strict
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, VecTransposeImage, is_vecenv_wrapped
from sb3_contrib import QRDQN, TQC, TRPO, MaskablePPO, RecurrentPPO
from sb3_contrib.common.wrappers import ActionMasker
@pytest.mark.parametrize("model_class", [TQC, QRDQN, TRPO])
@pytest.mark.parametrize("share_features_extractor", [True, False])
def test_cnn(tmp_path, model_class, share_features_extractor):
SAVE_NAME = "cnn_model.zip"
# Fake grayscale with frameskip
# Atari after preprocessing: 84x84x1, here we are using lower resolution
# to check that the network handle it automatically
env = FakeImageEnv(
screen_height=40,
screen_width=40,
n_channels=1,
discrete=model_class not in {TQC},
)
kwargs = dict(policy_kwargs=dict(share_features_extractor=share_features_extractor))
if model_class in {TQC, QRDQN}:
# share_features_extractor is checked later for offpolicy algorithms
if share_features_extractor:
return
# Avoid memory error when using replay buffer
# Reduce the size of the features and the number of quantiles
kwargs = dict(
buffer_size=250,
policy_kwargs=dict(
n_quantiles=25,
features_extractor_kwargs=dict(features_dim=32),
),
)
model = model_class("CnnPolicy", env, **kwargs).learn(250)
obs = env.reset()
# FakeImageEnv is channel last by default and should be wrapped
assert is_vecenv_wrapped(model.get_env(), VecTransposeImage)
# Test stochastic predict with channel last input
if model_class == QRDQN:
model.exploration_rate = 0.9
for _ in range(10):
model.predict(obs, deterministic=False)
action, _ = model.predict(obs, deterministic=True)
model.save(tmp_path / SAVE_NAME)
del model
model = model_class.load(tmp_path / SAVE_NAME)
# Check that the prediction is the same
assert np.allclose(action, model.predict(obs, deterministic=True)[0])
os.remove(str(tmp_path / SAVE_NAME))
def patch_qrdqn_names_(model):
# Small hack to make the test work with QRDQN
if isinstance(model, QRDQN):
model.critic = model.quantile_net
model.critic_target = model.quantile_net_target
def params_should_match(params, other_params):
for param, other_param in zip_strict(params, other_params):
assert th.allclose(param, other_param)
def params_should_differ(params, other_params):
for param, other_param in zip_strict(params, other_params):
assert not th.allclose(param, other_param)
@pytest.mark.parametrize("model_class", [TQC, QRDQN])
@pytest.mark.parametrize("share_features_extractor", [True, False])
def test_feature_extractor_target_net(model_class, share_features_extractor):
if model_class == QRDQN and share_features_extractor:
pytest.skip()
env = FakeImageEnv(screen_height=40, screen_width=40, n_channels=1, discrete=model_class not in {TQC})
if model_class in {TQC, QRDQN}:
# Avoid memory error when using replay buffer
# Reduce the size of the features and the number of quantiles
kwargs = dict(
buffer_size=250,
learning_starts=100,
policy_kwargs=dict(n_quantiles=25, features_extractor_kwargs=dict(features_dim=32)),
)
if model_class != QRDQN:
kwargs["policy_kwargs"]["share_features_extractor"] = share_features_extractor
model = model_class("CnnPolicy", env, seed=0, **kwargs)
patch_qrdqn_names_(model)
if share_features_extractor:
# Check that the objects are the same and not just copied
assert id(model.policy.actor.features_extractor) == id(model.policy.critic.features_extractor)
else:
# Check that the objects differ
if model_class != QRDQN:
assert id(model.policy.actor.features_extractor) != id(model.policy.critic.features_extractor)
# Critic and target should be equal at the begginning of training
params_should_match(model.critic.parameters(), model.critic_target.parameters())
model.learn(200)
# Critic and target should differ
params_should_differ(model.critic.parameters(), model.critic_target.parameters())
# Re-initialize and collect some random data (without doing gradient steps)
model = model_class("CnnPolicy", env, seed=0, **kwargs).learn(10)
patch_qrdqn_names_(model)
original_param = deepcopy(list(model.critic.parameters()))
original_target_param = deepcopy(list(model.critic_target.parameters()))
# Deactivate copy to target
model.tau = 0.0
model.train(gradient_steps=1)
# Target should be the same
params_should_match(original_target_param, model.critic_target.parameters())
# not the same for critic net (updated by gradient descent)
params_should_differ(original_param, model.critic.parameters())
# Update the reference as it should not change in the next step
original_param = deepcopy(list(model.critic.parameters()))
# Deactivate learning rate
model.lr_schedule = lambda _: 0.0
# Re-activate polyak update
model.tau = 0.01
# Special case for QRDQN: target net is updated in the `collect_rollouts()`
# not the `train()` method
if model_class == QRDQN:
model.target_update_interval = 1
model._on_step()
model.train(gradient_steps=1)
# Target should have changed now (due to polyak update)
params_should_differ(original_target_param, model.critic_target.parameters())
# Critic should be the same
params_should_match(original_param, model.critic.parameters())
@pytest.mark.parametrize("model_class", [TRPO, MaskablePPO, RecurrentPPO, QRDQN, TQC])
@pytest.mark.parametrize("normalize_images", [True, False])
def test_image_like_input(model_class, normalize_images):
"""
Check that we can handle image-like input (3D tensor)
when normalize_images=False
"""
# Fake grayscale with frameskip
# Atari after preprocessing: 84x84x1, here we are using lower resolution
# to check that the network handle it automatically
env = FakeImageEnv(
screen_height=36,
screen_width=36,
n_channels=1,
channel_first=True,
discrete=model_class not in {TQC},
)
if model_class == MaskablePPO:
def action_mask_fn(env):
random_invalid_action = random.randrange(env.action_space.n)
return [i != random_invalid_action for i in range(env.action_space.n)]
env = ActionMasker(env, action_mask_fn)
vec_env = VecNormalize(DummyVecEnv([lambda: env]))
# Reduce the size of the features
# deactivate normalization
kwargs = dict(
policy_kwargs=dict(
normalize_images=normalize_images,
features_extractor_kwargs=dict(features_dim=32),
),
seed=1,
)
policy = "CnnLstmPolicy" if model_class == RecurrentPPO else "CnnPolicy"
if model_class in {TRPO, MaskablePPO, RecurrentPPO}:
kwargs.update(dict(n_steps=64, batch_size=64))
else:
# Avoid memory error when using replay buffer
# Reduce the size of the features
kwargs.update(dict(buffer_size=250))
if normalize_images:
with pytest.raises(AssertionError):
model_class(policy, vec_env, **kwargs).learn(128)
else:
model_class(policy, vec_env, **kwargs).learn(128)