Extension - Gym Environment Interface: minimal example
[ ]:
import robotic as ry
import gymnasium as gym
import numpy as np
import time
print('ry version:', ry.__version__, ry.compiled())
Let’s first create a configuration
[ ]:
C = ry.Config()
C.addFile(ry.raiPath('scenarios/pandaSingle.g'))
C.setJointState([.007], ['l_panda_finger_joint1']) #only cosmetics
#the following makes it a LOT simpler
C.setJointState([.0], ['l_panda_joint2'])
C.setJointState([.7], ['l_panda_joint7'])
gripper = 'l_gripper'
box = C.addFrame('box') \
.setShape(ry.ST.ssBox, size=[.1,.1,.1,.005]) .setColor([1,.5,0]) \
.setPosition([.06,.35,.7]) \
.setMass(.1) \
.setAttributes({'friction': .5})
box.setPosition([.1,.35,.7]) # this is the initial position of the box - change manually to make harder
C.addFrame('plate', gripper) \
.setShape(ry.ST.ssBox, size=[.02,.2,.36,.005]) .setColor([.5,1,0]) \
.setRelativePosition([0,0,-.16]) \
.setAttributes({'friction': .1})
C.addFrame('target') \
.setShape(ry.ST.marker, size=[.1]) .setColor([0,1,0]) \
.setPosition([.3,.3,.7])
C.view()
Here is a basic class implementing a gym.Env:
[ ]:
class RoboticGym(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
tau_env = .05 #a gym environment step corresponds to 0.05 sec
tau_sim = .01 #the underlying physics simulation is stepped with 0.01 sec (5 sim steps in each env step)
time = 0.
render_mode = 'human'
viewSteps = False
random_reset = False
def __init__(self, C:ry.Config, time_limit, sim_verbose=0):
self.time_limit = time_limit
# create/load the robotic configuration
self.C = C
self.q0 = self.C.getJointState()
self.X0 = self.C.getFrameState()
self.box = self.C.getFrame('box')
n = self.q0.size
# define the observation space (see also observation_fct())
self.observation_space = gym.spaces.Box(-2., +2., shape=(2*n + 6,), dtype=np.float32)
# define the action space
action_scale = 5
action_dim = n
self.action_space = gym.spaces.Box(-action_scale, +action_scale, shape=(action_dim,), dtype=np.float32)
# create the underlying physics simulation (using PhysX from Nvidia)
self.sim = ry.Simulation(self.C, ry.SimulationEngine.physx, verbose=sim_verbose)
def __del__(self):
del self.sim
del self.C
def step(self, action):
'''essential to implement gym.Env'''
self.step_qDelta(action)
self.time += self.tau_env
observation = self.observation_fct()
reward = self.reward_fct()
terminated = False
truncated = (self.time >= self.time_limit) # terminated and truncated difference is super important
info = {"no": "additional info"}
return observation, reward, terminated, truncated, info
def observation_fct(self):
'''observations are joint pos/vel, and box pos/vel'''
X, q, Xdot, qdot = self.sim.getState()
box_pos = X[self.box.ID, :3]
box_vel = Xdot[self.box.ID, 0]
observation = np.concatenate((q, qdot, box_pos, box_vel), axis=0)
return observation
def reward_fct(self):
'''two reward terms: box-at-goal reward (peaked) and touch object reward (negDistance)'''
goalDist, _ = self.C.eval(ry.FS.positionDiff, ["box", "target"])
sigma = .2
r = 1. - np.tanh(np.linalg.norm(goalDist)/sigma)
negObjDistance, _ = self.C.eval(ry.FS.negDistance, ["plate", "box"])
r += negObjDistance[0]
return r
def step_qDelta(self, delta):
'''sending deltas in joint space; where deltas have semantic of velocity'''
# the sim needs to run at least with 100Hz (to properly simulate collisions)
# that's why we have multiple sim steps for each env step
sim_steps = int(self.tau_env/self.tau_sim)
delta[-1] *= 5.
for s in range(sim_steps):
#the following converts the deltas to position references for the underlying sim PD controller
q = self.sim.get_q()
self.sim.step(q + self.tau_sim * delta, self.tau_sim, ry.ControlMode.position)
if self.viewSteps:
self.C.view()
time.sleep(self.tau_sim)
def reset(self, seed=None):
'''essential to implement gym.Env'''
super().reset(seed=seed)
X = self.X0.copy()
if self.random_reset:
# resetting the box position to a random initial position -- makes it MUCH harder
X[self.box.ID, :2] += .3 * np.random.randn(2)
# resetting the sim
self.time = 0.
self.sim.resetTime()
self.sim.setState(X, self.q0)
self.sim.resetSplineRef()
observation = self.observation_fct()
info = {"no": "additional info"}
return observation, info
def rollout(self, pi):
'''helper to play and view a policy'''
self.viewSteps=True #triggers display within the low level step functions
obs, info = self.reset()
t = 0
R = 0
while True:
action = pi(obs, t)
obs, reward, terminated, truncated, info = self.step(action)
R += reward
t += 1
print("reward: ", reward)
if terminated or truncated:
break
print('total return:', R)
self.viewSteps=False #disables display within the low level step functions
def render(self):
'''also part of the env.Gym'''
self.C.view(False, f'RoboticGym time {self.time} / {self.time_limit}')
if self.render_mode == "rgb_array":
return self.C.view_getRgb()
Note in particular the reward_fct and observation_fct above, using the eval
methods to access geometric features.
Let’s first rollout a trivial policy:
[ ]:
env = RoboticGym(C, time_limit=3., sim_verbose=0) #verbosity of underlying sim - set to 2 once to see actual exploration during training
pi = lambda obs, t : np.array([-1.,0.,-1.,0,0,0,0])
env.rollout(pi)
env.C.view()
And we can train standard RL:
[ ]:
from stable_baselines3 import SAC, PPO
from stable_baselines3.common.callbacks import CheckpointCallback
import torch
print(torch.__version__)
[ ]:
checkpoint_callback = CheckpointCallback(save_freq=10000, save_path='./checkpoints_endeffectorActions/', name_prefix='rl_model')
model = SAC("MlpPolicy", env, gamma=0.99, learning_rate=3e-3, verbose=1, tensorboard_log="./tensorboard/")
# model = PPO("MlpPolicy", env, gamma=0.99, learning_rate=3e-3, n_steps=1024, verbose=1, tensorboard_log="./tensorboard/")
model.learn(total_timesteps=1_000, callback=checkpoint_callback)
… and roll out the trained policy
[ ]:
# model = SAC.load('./checkpoints_endeffectorActions/rl_model_199498_steps.zip', env=env)
while True:
pi = lambda obs, t : model.predict(obs, deterministic=True)[0] #change deterministic to False to see with noise
env.rollout(pi)
if env.C.view(True, 'press q to stop looping') == ord('q'):
break
[ ]: