Extension - Gym Environment Interface: minimal example

[ ]:
import robotic as ry
import gymnasium as gym
import numpy as np
print('ry version:', ry.__version__, ry.compiled())
[ ]:
# A basic configuration, where the orange box is to be pushed to the target with the stick

C = ry.Config()
C.addFile(ry.raiPath('scenarios/pandaSingle.g'))
C.view(False)

C.addFrame('box') \
    .setShape(ry.ST.ssBox, size=[.1,.1,.1,.005]) .setColor([1,.5,0]) \
    .setPosition([.1,.35,.9]) \
    .setMass(.1)

C.addFrame('stick', 'l_gripper') \
    .setShape(ry.ST.capsule, size=[.3,.02]) .setColor([.5,1,0]) \
    .setRelativePosition([0,0,-.13])

C.addFrame('target') \
    .setShape(ry.ST.marker, size=[.1]) .setColor([0,1,0]) \
    .setPosition([.5,.0,.7]) \

C.setJointState([.0], ['l_panda_joint2']) #only cosmetics
C.setJointState([.02], ['l_panda_finger_joint1']) #only cosmetics

q0 = C.getJointState()
X0 = C.getFrameState()

C.view()
[ ]:
# Generic gym environment, instantiating pyhsx multibody sim, with velocity control
# the arguments C, time_limit, and reward_fct define the problem

class RaiGym(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    tau = .05
    time = 0.

    def __init__(self, C, time_limit, reward_fct, render_mode=None):
        self.C = C
        self.time_limit = time_limit
        self.reward_fct = reward_fct
        self.render_mode = render_mode
        #self.limits = self.C.getJointLimits()
        self.limits = [-10., 10.]
        self.q0 = self.C.getJointState()
        self.X0 = self.C.getFrameState()

        self.observation_space = gym.spaces.box.Box(self.limits[0], self.limits[1], shape=(self.q0.size,), dtype=np.float32)
        self.action_space = gym.spaces.box.Box(low=-1., high=1., shape=(self.q0.size,), dtype=np.float32)

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        self.sim = ry.Simulation(self.C, ry.SimulationEngine.physx, 0)

    def __del__(self):
        del self.sim
        del self.C

    def step(self, action):
        self.sim.step(action, self.tau, ry.ControlMode.velocity)
        self.time += self.tau

        observation = self.C.getJointState()
        reward = self.reward_fct(C)
        terminated = (self.time >= self.time_limit)
        info = {"no": "additional info"}

        return observation, reward, terminated, False, info

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.time = 0.
        self.sim.setState(X0, q0)
        self.sim.resetSplineRef()

        observation = self.C.getJointState()
        info = {"no": "additional info"}

        if self.render_mode == "human":
            self.C.view(False)

        return observation, info

    def render(self):
        self.C.view(False, f'RaiGym time {self.time} / {self.time_limit}')
        if self.render_mode == "rgb_array":
            return self.C.view_getRgb()

[ ]:
# reward function

def reward_function(C):
    touch, _ = C.eval(ry.FS.negDistance, ["stick", "box"])
    dist, _ = C.eval(ry.FS.positionDiff, ["box", "target"])
    r = touch[0] - np.linalg.norm(dist)
    return r
[ ]:
g = RaiGym(C, 10., reward_function)
[ ]:
# basic test

g.reset()
v = np.zeros(g.q0.size)
v[0] = -1.
print(v)
t = 0
while True:
    t += 1
    ret = g.step(v)
    if ret[2]:
        break;
    print("reward: ", ret[1])
    if not (t%10):
        g.render()
[ ]:
# train a stable baslines

from stable_baselines3 import A2C, SAC
[ ]:
model = SAC("MlpPolicy", g, verbose=1)
#model = A2C("MlpPolicy", g, verbose=1)
[ ]:
model.learn(total_timesteps=1_000)
[ ]:
# play the policy

obs, info = g.reset()
for t in range(100):
    action, _state = model.predict(obs, deterministic=True)
    ret = g.step(action)
    if not (t%10):
        g.render()
[ ]:
del model
del g
del C
[ ]: