colab/Colab_UnityEnvironment_1_Run.ipynb
#@title Install Rendering Dependencies { display-mode: "form" }
#@markdown (You only need to run this code when using Colab's hosted runtime)
import os
from IPython.display import HTML, display
def progress(value, max=100):
return HTML("""
<progress
value='{value}'
max='{max}',
style='width: 100%'
>
{value}
</progress>
""".format(value=value, max=max))
pro_bar = display(progress(0, 100), display_id=True)
try:
import google.colab
INSTALL_XVFB = True
except ImportError:
INSTALL_XVFB = 'COLAB_ALWAYS_INSTALL_XVFB' in os.environ
if INSTALL_XVFB:
!sudo apt-get update -qq
pro_bar.update(progress(50, 100))
!sudo DEBIAN_FRONTEND=noninteractive apt-get install -y -qq xvfb
pro_bar.update(progress(90, 100))
import subprocess
subprocess.Popen(['Xvfb', ':1', '-screen', '0', '1024x768x24', '-ac', '+extension', 'GLX', '+render', '-noreset'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
os.environ["DISPLAY"] = ":1"
pro_bar.update(progress(100, 100))
try:
import mlagents
print("ml-agents already installed")
except ImportError:
!python -m pip install -q mlagents==1.1.0
print("Installed ml-agents")
#@title Select Environment { display-mode: "form" }
env_id = "GridWorld" #@param ['Basic', '3DBall', '3DBallHard', 'GridWorld', 'Hallway', 'VisualHallway', 'CrawlerDynamicTarget', 'CrawlerStaticTarget', 'Bouncer', 'SoccerTwos', 'PushBlock', 'VisualPushBlock', 'WallJump', 'Tennis', 'Reacher', 'Pyramids', 'VisualPyramids', 'Walker', 'FoodCollector', 'VisualFoodCollector', 'StrikersVsGoalie', 'WormStaticTarget', 'WormDynamicTarget']
# -----------------
# This code is used to close an env that might not have been closed before
try:
env.close()
except:
pass
# -----------------
from mlagents_envs.registry import default_registry
env = default_registry[env_id].make()
To reset the environment, simply call env.reset(). This method takes no argument and returns nothing but will send a signal to the simulation to reset.
env.reset()
# We will only consider the first Behavior
behavior_name = list(env.behavior_specs)[0]
print(f"Name of the behavior : {behavior_name}")
spec = env.behavior_specs[behavior_name]
# Examine the number of observations per Agent
print("Number of observations : ", len(spec.observation_specs))
# Is there a visual observation ?
# Visual observation have 3 dimensions: Height, Width and number of channels
vis_obs = any(len(spec.shape) == 3 for spec in spec.observation_specs)
print("Is there a visual observation ?", vis_obs)
# Is the Action continuous or multi-discrete ?
if spec.action_spec.continuous_size > 0:
print(f"There are {spec.action_spec.continuous_size} continuous actions")
if spec.action_spec.is_discrete():
print(f"There are {spec.action_spec.discrete_size} discrete actions")
# How many actions are possible ?
#print(f"There are {spec.action_size} action(s)")
# For discrete actions only : How many different options does each action has ?
if spec.action_spec.discrete_size > 0:
for action, branch_size in enumerate(spec.action_spec.discrete_branches):
print(f"Action number {action} has {branch_size} different options")
You can do this with the env.get_steps(behavior_name) method. If there are multiple behaviors in the Environment, you can call this method with each of the behavior's names.
Note This will not move the simulation forward.
decision_steps, terminal_steps = env.get_steps(behavior_name)
You can set the actions for the Agents of a Behavior by calling env.set_actions() you will need to specify the behavior name and pass a tensor of dimension 2. The first dimension of the action must be equal to the number of Agents that requested a decision during the step.
env.set_actions(behavior_name, spec.action_spec.empty_action(len(decision_steps)))
Call env.step() to move the simulation forward. The simulation will progress until an Agent requestes a decision or terminates.
env.step()
DecisionSteps.obs is a tuple containing all of the observations for all of the Agents with the provided Behavior name.
Each value in the tuple is an observation tensor containing the observation data for all of the agents.
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
for index, obs_spec in enumerate(spec.observation_specs):
if len(obs_spec.shape) == 3:
print("Here is the first visual observation")
plt.imshow(np.moveaxis(decision_steps.obs[index][0, :, :, :], 0, -1))
plt.show()
for index, obs_spec in enumerate(spec.observation_specs):
if len(obs_spec.shape) == 1:
print("First vector observations : ", decision_steps.obs[index][0,:])
for episode in range(3):
env.reset()
decision_steps, terminal_steps = env.get_steps(behavior_name)
tracked_agent = -1 # -1 indicates not yet tracking
done = False # For the tracked_agent
episode_rewards = 0 # For the tracked_agent
while not done:
# Track the first agent we see if not tracking
# Note : len(decision_steps) = [number of agents that requested a decision]
if tracked_agent == -1 and len(decision_steps) >= 1:
tracked_agent = decision_steps.agent_id[0]
# Generate an action for all agents
action = spec.action_spec.random_action(len(decision_steps))
# Set the actions
env.set_actions(behavior_name, action)
# Move the simulation forward
env.step()
# Get the new simulation results
decision_steps, terminal_steps = env.get_steps(behavior_name)
if tracked_agent in decision_steps: # The agent requested a decision
episode_rewards += decision_steps[tracked_agent].reward
if tracked_agent in terminal_steps: # The agent terminated its episode
episode_rewards += terminal_steps[tracked_agent].reward
done = True
print(f"Total rewards for episode {episode} is {episode_rewards}")
env.close()
print("Closed environment")