Extension - Gym Environment Interface: minimal example

[ ]:
import robotic as ry
import gymnasium as gym
import numpy as np
import time
print('ry version:', ry.__version__, ry.compiled())

Let’s first create a configuration

[ ]:
C = ry.Config()
C.addFile(ry.raiPath('scenarios/pandaSingle.g'))
C.setJointState([.007], ['l_panda_finger_joint1']) #only cosmetics
#the following makes it a LOT simpler
C.setJointState([.0], ['l_panda_joint2'])
C.setJointState([.7], ['l_panda_joint7'])
gripper = 'l_gripper'

box = C.addFrame('box') \
    .setShape(ry.ST.ssBox, size=[.1,.1,.1,.005]) .setColor([1,.5,0]) \
    .setPosition([.06,.35,.7]) \
    .setMass(.1) \
    .setAttributes({'friction': .5})
box.setPosition([.1,.35,.7]) # this is the initial position of the box - change manually to make harder

C.addFrame('plate', gripper) \
    .setShape(ry.ST.ssBox, size=[.02,.2,.36,.005]) .setColor([.5,1,0]) \
    .setRelativePosition([0,0,-.16]) \
    .setAttributes({'friction': .1})

C.addFrame('target') \
    .setShape(ry.ST.marker, size=[.1]) .setColor([0,1,0]) \
    .setPosition([.3,.3,.7])

C.view()

Here is a basic class implementing a gym.Env:

[ ]:
class RoboticGym(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    tau_env = .05 #a gym environment step corresponds to 0.05 sec
    tau_sim = .01 #the underlying physics simulation is stepped with 0.01 sec (5 sim steps in each env step)
    time = 0.
    render_mode = 'human'

    viewSteps = False
    random_reset = False

    def __init__(self, C:ry.Config, time_limit, sim_verbose=0):
        self.time_limit = time_limit

        # create/load the robotic configuration
        self.C = C
        self.q0 = self.C.getJointState()
        self.X0 = self.C.getFrameState()
        self.box = self.C.getFrame('box')
        n = self.q0.size

        # define the observation space (see also observation_fct())
        self.observation_space = gym.spaces.Box(-2., +2., shape=(2*n + 6,), dtype=np.float32)

        # define the action space
        action_scale = 5
        action_dim = n
        self.action_space = gym.spaces.Box(-action_scale, +action_scale, shape=(action_dim,), dtype=np.float32)

        # create the underlying physics simulation (using PhysX from Nvidia)
        self.sim = ry.Simulation(self.C, ry.SimulationEngine.physx, verbose=sim_verbose)

    def __del__(self):
        del self.sim
        del self.C

    def step(self, action):
        '''essential to implement gym.Env'''
        self.step_qDelta(action)

        self.time += self.tau_env

        observation = self.observation_fct()
        reward = self.reward_fct()
        terminated = False
        truncated = (self.time >= self.time_limit) # terminated and truncated difference is super important
        info = {"no": "additional info"}
        return observation, reward, terminated, truncated, info

    def observation_fct(self):
        '''observations are joint pos/vel, and box pos/vel'''
        X, q, Xdot, qdot = self.sim.getState()
        box_pos = X[self.box.ID, :3]
        box_vel = Xdot[self.box.ID, 0]
        observation = np.concatenate((q, qdot, box_pos, box_vel), axis=0)
        return observation

    def reward_fct(self):
        '''two reward terms: box-at-goal reward (peaked) and touch object reward (negDistance)'''
        goalDist, _ = self.C.eval(ry.FS.positionDiff, ["box", "target"])
        sigma = .2
        r = 1. - np.tanh(np.linalg.norm(goalDist)/sigma)

        negObjDistance, _ = self.C.eval(ry.FS.negDistance, ["plate", "box"])
        r += negObjDistance[0]
        return r

    def step_qDelta(self, delta):
        '''sending deltas in joint space; where deltas have semantic of velocity'''
        # the sim needs to run at least with 100Hz (to properly simulate collisions)
        # that's why we have multiple sim steps for each env step
        sim_steps = int(self.tau_env/self.tau_sim)
        delta[-1] *= 5.
        for s in range(sim_steps):
            #the following converts the deltas to position references for the underlying sim PD controller
            q = self.sim.get_q()
            self.sim.step(q + self.tau_sim * delta, self.tau_sim, ry.ControlMode.position)
            if self.viewSteps:
                self.C.view()
                time.sleep(self.tau_sim)

    def reset(self, seed=None):
        '''essential to implement gym.Env'''
        super().reset(seed=seed)

        X = self.X0.copy()
        if self.random_reset:
            # resetting the box position to a random initial position -- makes it MUCH harder
            X[self.box.ID, :2] += .3 * np.random.randn(2)

        # resetting the sim
        self.time = 0.
        self.sim.resetTime()
        self.sim.setState(X, self.q0)
        self.sim.resetSplineRef()

        observation = self.observation_fct()
        info = {"no": "additional info"}
        return observation, info

    def rollout(self, pi):
        '''helper to play and view a policy'''
        self.viewSteps=True #triggers display within the low level step functions
        obs, info = self.reset()
        t = 0
        R = 0
        while True:
            action = pi(obs, t)
            obs, reward, terminated, truncated, info = self.step(action)
            R += reward
            t += 1
            print("reward: ", reward)
            if terminated or truncated:
                break
        print('total return:', R)
        self.viewSteps=False #disables display within the low level step functions

    def render(self):
        '''also part of the env.Gym'''
        self.C.view(False, f'RoboticGym time {self.time} / {self.time_limit}')
        if self.render_mode == "rgb_array":
            return self.C.view_getRgb()

Note in particular the reward_fct and observation_fct above, using the eval methods to access geometric features.

Let’s first rollout a trivial policy:

[ ]:
env = RoboticGym(C, time_limit=3., sim_verbose=0)  #verbosity of underlying sim - set to 2 once to see actual exploration during training

pi = lambda obs, t : np.array([-1.,0.,-1.,0,0,0,0])
env.rollout(pi)
env.C.view()

And we can train standard RL:

[ ]:
from stable_baselines3 import SAC, PPO
from stable_baselines3.common.callbacks import CheckpointCallback
import torch
print(torch.__version__)
[ ]:
checkpoint_callback = CheckpointCallback(save_freq=10000, save_path='./checkpoints_endeffectorActions/', name_prefix='rl_model')

model = SAC("MlpPolicy", env, gamma=0.99, learning_rate=3e-3, verbose=1, tensorboard_log="./tensorboard/")
# model = PPO("MlpPolicy", env, gamma=0.99, learning_rate=3e-3, n_steps=1024, verbose=1, tensorboard_log="./tensorboard/")

model.learn(total_timesteps=1_000, callback=checkpoint_callback)

… and roll out the trained policy

[ ]:
# model = SAC.load('./checkpoints_endeffectorActions/rl_model_199498_steps.zip', env=env)

while True:
    pi = lambda obs, t : model.predict(obs, deterministic=True)[0] #change deterministic to False to see with noise
    env.rollout(pi)
    if env.C.view(True, 'press q to stop looping') == ord('q'):
        break
[ ]: