58
58
)
59
59
from lerobot .common .teleoperators import (
60
60
gamepad , # noqa: F401
61
+ keyboard , # noqa: F401
61
62
make_teleoperator_from_config ,
62
63
so101_leader , # noqa: F401
63
64
)
64
65
from lerobot .common .teleoperators .gamepad .teleop_gamepad import GamepadTeleop
66
+ from lerobot .common .teleoperators .keyboard .teleop_keyboard import KeyboardEndEffectorTeleop
65
67
from lerobot .common .utils .robot_utils import busy_wait
66
68
from lerobot .common .utils .utils import log_say
67
69
from lerobot .configs import parser
@@ -1191,7 +1193,7 @@ def _init_keyboard_events(self):
1191
1193
"rerecord_episode" : False ,
1192
1194
}
1193
1195
1194
- def _handle_key_press (self , key , keyboard ):
1196
+ def _handle_key_press (self , key , keyboard_device ):
1195
1197
"""
1196
1198
Handle key press events.
1197
1199
@@ -1202,10 +1204,10 @@ def _handle_key_press(self, key, keyboard):
1202
1204
This method should be overridden in subclasses for additional key handling.
1203
1205
"""
1204
1206
try :
1205
- if key == keyboard .Key .esc :
1207
+ if key == keyboard_device .Key .esc :
1206
1208
self .keyboard_events ["episode_end" ] = True
1207
1209
return
1208
- if key == keyboard .Key .left :
1210
+ if key == keyboard_device .Key .left :
1209
1211
self .keyboard_events ["rerecord_episode" ] = True
1210
1212
return
1211
1213
if hasattr (key , "char" ) and key .char == "s" :
@@ -1221,13 +1223,13 @@ def _init_keyboard_listener(self):
1221
1223
1222
1224
This method sets up keyboard event handling if not in headless mode.
1223
1225
"""
1224
- from pynput import keyboard
1226
+ from pynput import keyboard as keyboard_device
1225
1227
1226
1228
def on_press (key ):
1227
1229
with self .event_lock :
1228
- self ._handle_key_press (key , keyboard )
1230
+ self ._handle_key_press (key , keyboard_device )
1229
1231
1230
- self .listener = keyboard .Listener (on_press = on_press )
1232
+ self .listener = keyboard_device .Listener (on_press = on_press )
1231
1233
self .listener .start ()
1232
1234
1233
1235
def _check_intervention (self ):
@@ -1403,7 +1405,7 @@ def _init_keyboard_events(self):
1403
1405
super ()._init_keyboard_events ()
1404
1406
self .keyboard_events ["human_intervention_step" ] = False
1405
1407
1406
- def _handle_key_press (self , key , keyboard ):
1408
+ def _handle_key_press (self , key , keyboard_device ):
1407
1409
"""
1408
1410
Handle key presses including space for intervention toggle.
1409
1411
@@ -1413,8 +1415,8 @@ def _handle_key_press(self, key, keyboard):
1413
1415
1414
1416
Extends the base handler to respond to space key for toggling intervention.
1415
1417
"""
1416
- super ()._handle_key_press (key , keyboard )
1417
- if key == keyboard .Key .space :
1418
+ super ()._handle_key_press (key , keyboard_device )
1419
+ if key == keyboard_device .Key .space :
1418
1420
if not self .keyboard_events ["human_intervention_step" ]:
1419
1421
logging .info (
1420
1422
"Space key pressed. Human intervention required.\n "
@@ -1574,7 +1576,7 @@ def __init__(
1574
1576
print (" Y/Triangle button: End episode (SUCCESS)" )
1575
1577
print (" B/Circle button: Exit program" )
1576
1578
1577
- def get_gamepad_action (
1579
+ def get_teleop_commands (
1578
1580
self ,
1579
1581
) -> tuple [bool , np .ndarray , bool , bool , bool ]:
1580
1582
"""
@@ -1643,7 +1645,7 @@ def step(self, action):
1643
1645
terminate_episode ,
1644
1646
success ,
1645
1647
rerecord_episode ,
1646
- ) = self .get_gamepad_action ()
1648
+ ) = self .get_teleop_commands ()
1647
1649
1648
1650
# Update episode ending state if requested
1649
1651
if terminate_episode :
@@ -1700,6 +1702,90 @@ def close(self):
1700
1702
return self .env .close ()
1701
1703
1702
1704
1705
+ class KeyboardControlWrapper (GamepadControlWrapper ):
1706
+ """
1707
+ Wrapper that allows controlling a gym environment with a keyboard.
1708
+
1709
+ This wrapper intercepts the step method and allows human input via keyboard
1710
+ to override the agent's actions when desired.
1711
+
1712
+ Inherits from GamepadControlWrapper to avoid code duplication.
1713
+ """
1714
+
1715
+ def __init__ (
1716
+ self ,
1717
+ env ,
1718
+ teleop_device , # Accepts an instantiated teleoperator
1719
+ use_gripper = False , # This should align with teleop_device's config
1720
+ auto_reset = False ,
1721
+ ):
1722
+ """
1723
+ Initialize the gamepad controller wrapper.
1724
+
1725
+ Args:
1726
+ env: The environment to wrap.
1727
+ teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop).
1728
+ use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper).
1729
+ auto_reset: Whether to auto reset the environment when episode ends.
1730
+ """
1731
+ super ().__init__ (env , teleop_device , use_gripper , auto_reset )
1732
+
1733
+ self .is_intervention_active = False
1734
+
1735
+ logging .info ("Keyboard control wrapper initialized with provided teleop_device." )
1736
+ print ("Keyboard controls:" )
1737
+ print (" Arrow keys: Move in X-Y plane" )
1738
+ print (" Shift and Shift_R: Move in Z axis" )
1739
+ print (" Right Ctrl and Left Ctrl: Open and close gripper" )
1740
+ print (" f: End episode with FAILURE" )
1741
+ print (" s: End episode with SUCCESS" )
1742
+ print (" r: End episode with RERECORD" )
1743
+ print (" i: Start/Stop Intervention" )
1744
+
1745
+ def get_teleop_commands (
1746
+ self ,
1747
+ ) -> tuple [bool , np .ndarray , bool , bool , bool ]:
1748
+ action_dict = self .teleop_device .get_action ()
1749
+ episode_end_status = None
1750
+
1751
+ # Unroll the misc_keys_queue to check for events related to intervention, episode success, etc.
1752
+ while not self .teleop_device .misc_keys_queue .empty ():
1753
+ key = self .teleop_device .misc_keys_queue .get ()
1754
+ if key == "i" :
1755
+ self .is_intervention_active = not self .is_intervention_active
1756
+ elif key == "f" :
1757
+ episode_end_status = "failure"
1758
+ elif key == "s" :
1759
+ episode_end_status = "success"
1760
+ elif key == "r" :
1761
+ episode_end_status = "rerecord_episode"
1762
+
1763
+ terminate_episode = episode_end_status is not None
1764
+ success = episode_end_status == "success"
1765
+ rerecord_episode = episode_end_status == "rerecord_episode"
1766
+
1767
+ # Convert action_dict to numpy array based on expected structure
1768
+ # Order: delta_x, delta_y, delta_z, gripper (if use_gripper)
1769
+ action_list = [action_dict ["delta_x" ], action_dict ["delta_y" ], action_dict ["delta_z" ]]
1770
+ if self .use_gripper :
1771
+ # GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open)
1772
+ # This needs to be consistent with what EEActionWrapper expects if it's used downstream
1773
+ # EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open)
1774
+ # For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility.
1775
+ gripper_val = action_dict .get ("gripper" , 1.0 ) # Default to 1.0 (stay) if not present
1776
+ action_list .append (float (gripper_val ))
1777
+
1778
+ gamepad_action_np = np .array (action_list , dtype = np .float32 )
1779
+
1780
+ return (
1781
+ self .is_intervention_active ,
1782
+ gamepad_action_np ,
1783
+ terminate_episode ,
1784
+ success ,
1785
+ rerecord_episode ,
1786
+ )
1787
+
1788
+
1703
1789
class GymHilDeviceWrapper (gym .Wrapper ):
1704
1790
def __init__ (self , env , device = "cpu" ):
1705
1791
super ().__init__ (env )
@@ -1843,6 +1929,15 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
1843
1929
teleop_device = teleop_device ,
1844
1930
use_gripper = cfg .wrapper .use_gripper ,
1845
1931
)
1932
+ elif control_mode == "keyboard_ee" :
1933
+ assert isinstance (teleop_device , KeyboardEndEffectorTeleop ), (
1934
+ "teleop_device must be an instance of KeyboardEndEffectorTeleop for keyboard control mode"
1935
+ )
1936
+ env = KeyboardControlWrapper (
1937
+ env = env ,
1938
+ teleop_device = teleop_device ,
1939
+ use_gripper = cfg .wrapper .use_gripper ,
1940
+ )
1846
1941
elif control_mode == "leader" :
1847
1942
env = GearedLeaderControlWrapper (
1848
1943
env = env ,
0 commit comments