Extension - Gym Environment Interface: minimal example
[ ]:
import robotic as ry
import gymnasium as gym
import numpy as np
print('ry version:', ry.__version__, ry.compiled())
[ ]:
# A basic configuration, where the orange box is to be pushed to the target with the stick
C = ry.Config()
C.addFile(ry.raiPath('scenarios/pandaSingle.g'))
C.view(False)
C.addFrame('box') \
.setShape(ry.ST.ssBox, size=[.1,.1,.1,.005]) .setColor([1,.5,0]) \
.setPosition([.1,.35,.9]) \
.setMass(.1)
C.addFrame('stick', 'l_gripper') \
.setShape(ry.ST.capsule, size=[.3,.02]) .setColor([.5,1,0]) \
.setRelativePosition([0,0,-.13])
C.addFrame('target') \
.setShape(ry.ST.marker, size=[.1]) .setColor([0,1,0]) \
.setPosition([.5,.0,.7]) \
C.setJointState([.0], ['l_panda_joint2']) #only cosmetics
C.setJointState([.02], ['l_panda_finger_joint1']) #only cosmetics
q0 = C.getJointState()
X0 = C.getFrameState()
C.view()
[ ]:
# Generic gym environment, instantiating pyhsx multibody sim, with velocity control
# the arguments C, time_limit, and reward_fct define the problem
class RaiGym(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
tau = .05
time = 0.
def __init__(self, C, time_limit, reward_fct, render_mode=None):
self.C = C
self.time_limit = time_limit
self.reward_fct = reward_fct
self.render_mode = render_mode
#self.limits = self.C.getJointLimits()
self.limits = [-10., 10.]
self.q0 = self.C.getJointState()
self.X0 = self.C.getFrameState()
self.observation_space = gym.spaces.box.Box(self.limits[0], self.limits[1], shape=(self.q0.size,), dtype=np.float32)
self.action_space = gym.spaces.box.Box(low=-1., high=1., shape=(self.q0.size,), dtype=np.float32)
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
self.sim = ry.Simulation(self.C, ry.SimulationEngine.physx, 0)
def __del__(self):
del self.sim
del self.C
def step(self, action):
self.sim.step(action, self.tau, ry.ControlMode.velocity)
self.time += self.tau
observation = self.C.getJointState()
reward = self.reward_fct(C)
terminated = (self.time >= self.time_limit)
info = {"no": "additional info"}
return observation, reward, terminated, False, info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.time = 0.
self.sim.setState(X0, q0)
self.sim.resetSplineRef()
observation = self.C.getJointState()
info = {"no": "additional info"}
if self.render_mode == "human":
self.C.view(False)
return observation, info
def render(self):
self.C.view(False, f'RaiGym time {self.time} / {self.time_limit}')
if self.render_mode == "rgb_array":
return self.C.view_getRgb()
[ ]:
# reward function
def reward_function(C):
touch, _ = C.eval(ry.FS.negDistance, ["stick", "box"])
dist, _ = C.eval(ry.FS.positionDiff, ["box", "target"])
r = touch[0] - np.linalg.norm(dist)
return r
[ ]:
g = RaiGym(C, 10., reward_function)
[ ]:
# basic test
g.reset()
v = np.zeros(g.q0.size)
v[0] = -1.
print(v)
t = 0
while True:
t += 1
ret = g.step(v)
if ret[2]:
break;
print("reward: ", ret[1])
if not (t%10):
g.render()
[ ]:
# train a stable baslines
from stable_baselines3 import A2C, SAC
[ ]:
model = SAC("MlpPolicy", g, verbose=1)
#model = A2C("MlpPolicy", g, verbose=1)
[ ]:
model.learn(total_timesteps=1_000)
[ ]:
# play the policy
obs, info = g.reset()
for t in range(100):
action, _state = model.predict(obs, deterministic=True)
ret = g.step(action)
if not (t%10):
g.render()
[ ]:
del model
del g
del C
[ ]: