@@ -13,7 +13,7 @@ import isaaclab.sim as sim_utils
from isaaclab . assets import Articulation
from isaaclab . assets import Articulation
from isaaclab . envs import DirectRLEnv
from isaaclab . envs import DirectRLEnv
from isaaclab . sim . spawners . from_files import GroundPlaneCfg , spawn_ground_plane
from isaaclab . sim . spawners . from_files import GroundPlaneCfg , spawn_ground_plane
from isaaclab . utils . math import sample_uniform , euler_xyz_from_quat
from isaaclab . utils . math import sample_uniform , euler_xyz_from_quat , quat_apply_inverse
from isaaclab . sensors import RayCaster , RayCasterCfg , patterns
from isaaclab . sensors import RayCaster , RayCasterCfg , patterns
from isaaclab . markers import VisualizationMarkers , VisualizationMarkersCfg
from isaaclab . markers import VisualizationMarkers , VisualizationMarkersCfg
from isaaclab . utils . assets import ISAAC_NUCLEUS_DIR
from isaaclab . utils . assets import ISAAC_NUCLEUS_DIR
@@ -46,12 +46,12 @@ def define_markers() -> VisualizationMarkers:
# TODO 注意传感器父框架的配置
# TODO 注意传感器父框架的配置
def define_height_sensor ( ) - > RayCaster :
def define_height_sensor ( ) - > RayCaster :
height_sensor_cfg = RayCasterCfg (
height_sensor_cfg = RayCasterCfg (
prim_path = " /World/envs/env_.*/Robot/body " , # 明确路径
prim_path = " /World/envs/env_.*/Robot/body " ,
update_period = 0.02 ,
update_period = 0.02 , # 50Hz
offset = RayCasterCfg . OffsetCfg ( pos = ( 0.0 , 0.0 , 1 .0) ) , # 更合理的初始高度
offset = RayCasterCfg . OffsetCfg ( pos = ( 0.0 , 0.0 , 0 .0) ) ,
attach_yaw_only = True ,
attach_yaw_only = True ,
pattern_cfg = patterns . GridPatternCfg ( resolution = 0.1 , size = [ 1.0 , 1.0 ] ) , # type: ignore
pattern_cfg = patterns . GridPatternCfg ( resolution = 0.1 , size = [ 1.0 , 1.0 ] ) , # type: ignore # TODO 修改传感器尺寸
debug_vis = Tru e,
debug_vis = Fals e,
mesh_prim_paths = [ " /World/ground " ] , # 确认的实际地面路径
mesh_prim_paths = [ " /World/ground " ] , # 确认的实际地面路径
)
)
return RayCaster ( cfg = height_sensor_cfg )
return RayCaster ( cfg = height_sensor_cfg )
@@ -91,19 +91,34 @@ class FlexrV0Env(DirectRLEnv):
self . _actuator_joint_indices [ " wheel_BR " ]
self . _actuator_joint_indices [ " wheel_BR " ]
)
)
# 调试打印所有映射
# # 调试打印所有映射
logging. debug ( " Actuator Group to Joint Indices:" , self . _actuator_joint_indices )
# logging.debug(" Actuator Group to Joint Indices:", self. _actuator_joint_indices)
# 初始化状态变量
self . joint_pos = self . robot . data . joint_pos
self . joint_vel = self . robot . data . joint_vel
# 中间变量
# 中间变量
self . dt = self . cfg . sim . dt
# 指令 - 线速度仅跟踪 x 和 y 分量;角速度仅跟踪 z 分量
# FIXME 暂时初始化为0值
self . cmd_ang_vel = torch . zeros_like ( self . robot . data . root_ang_vel_w )
self . cmd_lin_vel = torch . zeros_like ( self . robot . data . root_lin_vel_w )
# 角速度
# 角速度
self . _last_root_ang_vel = torch . zeros_like ( self . robot . data . root_ang_vel_w )
self . _last_root_ang_vel = torch . zeros_like ( self . robot . data . root_ang_vel_w )
# 角度
# 角度
self . orientations = torch . zeros_like ( self . robot . data . root_quat_w )
self . orientations = torch . zeros_like ( self . robot . data . root_quat_w )
# 线速度 [num_envs, 3]
self . base_lin_vel = torch . zeros_like ( self . robot . data . root_lin_vel_w )
# 角速度 [num_envs, 3]
self . base_ang_vel = torch . zeros_like ( self . robot . data . root_ang_vel_w )
# 关节速度 [num_envs, n_joints]
self . joint_vel = torch . zeros_like ( self . robot . data . joint_vel )
# 关节角度 [num_envs, n_joints]
self . joint_pos = torch . zeros_like ( self . robot . data . joint_pos )
# 关节扭矩 [num_envs, n_joints]
self . joint_torque = torch . zeros_like ( self . robot . data . applied_torque )
# 初始化状态变量
self . joint_pos = self . robot . data . joint_pos
self . joint_vel = self . robot . data . joint_vel
def _get_wheel_joint_indices ( self , prefix : str , joint_names : list [ str ] ) - > list [ int ] :
def _get_wheel_joint_indices ( self , prefix : str , joint_names : list [ str ] ) - > list [ int ] :
""" 获取指定腿部的所有轮子关节索引(返回整数列表) """
""" 获取指定腿部的所有轮子关节索引(返回整数列表) """
indices = [ ]
indices = [ ]
@@ -134,7 +149,6 @@ class FlexrV0Env(DirectRLEnv):
def _pre_physics_step ( self , actions : torch . Tensor ) - > None :
def _pre_physics_step ( self , actions : torch . Tensor ) - > None :
self . actions = actions . clone ( )
self . actions = actions . clone ( )
self . orientations = self . robot . data . root_quat_w
def _apply_action ( self ) - > None :
def _apply_action ( self ) - > None :
@@ -144,6 +158,8 @@ class FlexrV0Env(DirectRLEnv):
if self . actions . dim ( ) == 1 :
if self . actions . dim ( ) == 1 :
self . actions = self . actions . unsqueeze ( 0 )
self . actions = self . actions . unsqueeze ( 0 )
self . last_actions = self . actions . clone ( )
# 分离动作分量
# 分离动作分量
arm_actions = self . actions [ : , : 4 ] # 前4个分量控制摆臂 (FL, FR, BL, BR)
arm_actions = self . actions [ : , : 4 ] # 前4个分量控制摆臂 (FL, FR, BL, BR)
wheel_actions = self . actions [ : , 4 : 8 ] # 后4个分量控制轮组 (FL, FR, BL, BR)
wheel_actions = self . actions [ : , 4 : 8 ] # 后4个分量控制轮组 (FL, FR, BL, BR)
@@ -174,91 +190,98 @@ class FlexrV0Env(DirectRLEnv):
for i , group in enumerate ( wheel_groups ) :
for i , group in enumerate ( wheel_groups ) :
joint_indices = self . _actuator_joint_indices [ group ] # 全局索引
joint_indices = self . _actuator_joint_indices [ group ] # 全局索引
wheel_vel_target [ : , joint_indices ] = wheel_actions [ : , i ] . unsqueeze ( - 1 )
wheel_vel_target [ : , joint_indices ] = wheel_actions [ : , i ] . unsqueeze ( - 1 )
self . robot . set_joint_velocity_target (
wheel_vel_target [ : , self . _all_wheel_joint_indices ] , # 只选择轮组部分
joint_ids = self . _all_wheel_joint_indices
)
# print(f"wheel_actions: {wheel_actions}")
# print(f"wheel_actions: {wheel_actions}")
# print(f"actions: {self.actions}")
# print(f"actions: {self.actions}")
# print(f"Arm Pos Target: {arm_pos_target}")
# print(f"Arm Pos Target: {arm_pos_target}")
# print(f"Wheel Vel Target: {wheel_vel_target}")
# print(f"Wheel Vel Target: {wheel_vel_target}")
# self.robot.set_joint_velocity_target(
# wheel_vel_target[:, self._all_wheel_joint_indices], # 只选择轮组部分
# joint_ids=self._all_wheel_joint_indices
# )
def _get_observations ( self ) - > dict :
def _get_observations ( self ) - > dict :
# 获取摆臂关节速度
# 计算自身坐标系重力的矢量值
# 世界坐标系下的重力向量 (0, 0, -1)
gravity_world = torch . tensor ( [ 0.0 , 0.0 , - 1.0 ] , device = self . device ) . repeat ( self . num_envs , 1 )
# 转换到机体坐标系
gravity_body = quat_apply_inverse ( self . orientations , gravity_world )
# arm 关节的位置和速度
arm_pos = self . joint_pos [ : , self . _arm_joint_indices ] # [num_envs, 4]
arm_vel = self . joint_vel [ : , self . _arm_joint_indices ] # [num_envs, 4]
arm_vel = self . joint_vel [ : , self . _arm_joint_indices ] # [num_envs, 4]
# 计算每个轮组的平均 速度
# 轮组 速度
wheel_FL_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_FL " ] ] . mean ( dim = 1 , keepdim = True )
wheel_FL_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_FL " ] ] . mean ( dim = 1 , keepdim = True )
wheel_FR_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_FR " ] ] . mean ( dim = 1 , keepdim = True )
wheel_FR_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_FR " ] ] . mean ( dim = 1 , keepdim = True )
wheel_BL_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_BL " ] ] . mean ( dim = 1 , keepdim = True )
wheel_BL_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_BL " ] ] . mean ( dim = 1 , keepdim = True )
wheel_BR_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_BR " ] ] . mean ( dim = 1 , keepdim = True )
wheel_BR_vel = self . joint_vel [ : , self . _actuator_joint_indices [ " wheel_BR " ] ] . mean ( dim = 1 , keepdim = True )
wheel_vel = torch . cat ( [ wheel_FL_vel , wheel_FR_vel , wheel_BL_vel , wheel_BR_vel ] , dim = 1 ) # [num_envs, 4]
wheel_vel = torch . cat ( [ wheel_FL_vel , wheel_FR_vel , wheel_BL_vel , wheel_BR_vel ] , dim = 1 ) # [num_envs, 4]
# 获取车体姿态和运动信息
base_quat = self . robot . data . root_quat_w # [num_envs, 4] (w,x,y,z)
base_ang_vel = self . robot . data . root_ang_vel_w # [num_envs, 3] (x,y,z)
# 将四元数转换为欧拉角( 使用Isaac Lab的函数)
# 先前动作
roll , pitch , _ = euler_xyz_from_quat ( base_quat ) # 返回弧度值
last_actions = self . last_actions if hasattr ( self , ' last_actions ' ) else torch . zeros_like ( self . actions )
# # 归一化角度到[-1,1]范围(假设±π/2是最大范围)
# 指令xy线速度和指令角速度
# norm_pitch = pitch / (math.pi/2) # 归一化到[-1,1 ]
cmd_lin_vel_xy = self . cmd_lin_vel [ : , : 2 ] # [num_envs, 2 ]
# norm_roll = roll / (math.pi/2) # 归一化到[-1, 1]
cmd_ang_vel_z = self . cmd_ang_vel [ : , 2 : 3 ] # [num_envs, 1]
norm_pitch = pitch / ( math . pi ) # 归一化到[-1,1]
norm_roll = roll / ( math . pi ) # 归一化到[-1,1]
# 添加噪声(可选)
# # 高程图 (需要从height_sensor获取)
noise_std = 0.01 # 噪声标准差
# # 确保高度传感器数据是最新的
norm_pitch + = torch . randn_like ( norm_pitch ) * noise_std
# self.height_sensor._update_outdated_buffers()
norm_roll + = torch . randn_like ( norm_roll ) * noise_std
# # 获取高度数据并展平 (假设每个环境有121个高度点)
# height_scan = self.height_sensor.data.ray_hits_w[..., 2] # [num_envs, 121]
# # 归一化高度数据
# height_scan = (height_scan - height_scan.mean(dim=1, keepdim=True)) / (height_scan.std(dim=1, keepdim=True) + 1e-6)
# 组合所有观测
# 组合所有观测
obs = torch . cat (
obs = torch . cat (
(
(
self . base_lin_vel , # 机体线速度 [num_envs, 3]
self . base_ang_vel , # 机体角速度 [num_envs, 3]
gravity_body , # 重力矢量 [num_envs, 3]
arm_pos , # 摆臂位置 [num_envs, 4]
arm_vel , # 摆臂速度 [num_envs, 4]
arm_vel , # 摆臂速度 [num_envs, 4]
wheel_vel , # 轮组平均 速度 [num_envs, 4]
wheel_vel , # 轮组速度 [num_envs, 4]
norm_pitch . unsqueeze ( - 1 ) , # 归一化pitch [num_envs, 1 ]
last_actions , # 先前动作 [num_envs, 8 ]
norm_roll . unsqueeze ( - 1 ) , # 归一化roll [num_envs, 1 ]
cmd_lin_vel_xy , # 指令线速度xy [num_envs, 2 ]
base _ang_vel # 三轴 角速度 [num_envs, 3 ]
cmd _ang_vel_z , # 指令 角速度z [num_envs, 1 ]
# height_scan # 高程图 [num_envs, 121]
) ,
) ,
dim = - 1 ,
dim = - 1 ,
)
)
observations = { " policy " : obs }
observations = { " policy " : obs }
return observations
return observations
def _get_rewards ( self ) - > torch . Tensor :
# 获取当前角速度
current_ang_vel = self . robot . data . root_ang_vel_w
# 计算角加速度 (当前角速度 - 上一帧角速度) / dt
root_ang_acc = ( current_ang_vel - self . _last_root_ang_vel ) / self . step_dt
# 更新上一帧角速度
self . _last_root_ang_vel [ : ] = current_ang_vel
# 目标前进速度 (可以根据需要调整)
target_velocity = torch . ones ( self . num_envs , 1 , device = self . device ) * self . cfg . target_velocity
# 计算奖励
def _get_rewards ( self ) - > torch . Tensor :
self . joint_pos = self . robot . data . joint_pos
self . joint_vel = self . robot . data . joint_vel
self . joint_torque = self . robot . data . applied_torque
self . base_lin_vel = self . robot . data . root_lin_vel_w
self . base_ang_vel = self . robot . data . root_ang_vel_w
self . orientations = self . robot . data . root_quat_w
total_reward = compute_rewards (
total_reward = compute_rewards (
root_ang_acc = root_ang_acc ,
self . dt ,
last_root_ang_vel = self . _last_root_ang _vel,
self . cmd_lin _vel,
dt = self . step_dt ,
self . cmd_ang_vel ,
rew_scale_smoothness = self . cfg . rew_scale_smoothness ,
self . base_lin_vel ,
rew_scale_alive = self . cfg . rew_scale_ali ve,
self . base_ang_ vel ,
rew_scale_velocity = self . cfg . rew_scale_velocity ,
self . joint_vel ,
base_lin_vel = self . robot . data . root_lin_vel_w ,
self . joint_torque ,
target_velocity = target_velocity ,
self . actions ,
terminated = self . reset_terminated
self . last_actions ,
self . cfg . rew_scale_lin_vel ,
self . cfg . rew_scale_ang_vel ,
self . cfg . rew_scale_z ,
self . cfg . rew_scale_orientation ,
self . cfg . rew_scale_joint_motion ,
self . cfg . rew_scale_joint_torque ,
self . cfg . rew_scale_action_rate ,
)
)
# logging.info(f"Computed rewards mean: {total_reward.mean().item()}" )
return total_reward . reshape ( - 1 )
return total_reward
# def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
# def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
# self.joint_pos = self.robot.data.joint_pos
# self.joint_pos = self.robot.data.joint_pos
@@ -319,7 +342,7 @@ class FlexrV0Env(DirectRLEnv):
def _check_robot_flipped ( self ) - > torch . Tensor :
def _check_robot_flipped ( self ) - > torch . Tensor :
""" 改进的 倾覆检测方法, 正确处理0/360度问题 """
""" 倾覆检测 """
# 获取欧拉角 (弧度)
# 获取欧拉角 (弧度)
roll , pitch , _ = euler_xyz_from_quat ( self . orientations )
roll , pitch , _ = euler_xyz_from_quat ( self . orientations )
@@ -343,15 +366,6 @@ class FlexrV0Env(DirectRLEnv):
print ( f " Robot flipped at idx: { torch . nonzero ( flipped ) } [x: { roll_deg [ flipped ] } , y: { pitch_deg [ flipped ] } ] " )
print ( f " Robot flipped at idx: { torch . nonzero ( flipped ) } [x: { roll_deg [ flipped ] } , y: { pitch_deg [ flipped ] } ] " )
return flipped
return flipped
# def _get_velocities(self, env_ids = 0) -> tuple[torch.Tensor, torch.Tensor]:
# return self.joint_vel[env_ids, :3], self.joint_vel[env_ids, 3:]
# def _get_positions(self, env_ids = 0) -> tuple[torch.Tensor, torch.Tensor]:
# return self.joint_pos[env_ids, :3], self.joint_pos[env_ids, 3:]
# def _get_orientations(self, env_ids = 0) -> tuple[torch.Tensor, torch.Tensor]:
# return self.orientations[env_ids, :3], self.orientations[env_ids, 3:]
def _debug_print_idx ( self , env_ids : Sequence [ int ] ) :
def _debug_print_idx ( self , env_ids : Sequence [ int ] ) :
logging . debug ( f " env_ids: { env_ids } " )
logging . debug ( f " env_ids: { env_ids } " )
@@ -360,50 +374,75 @@ class FlexrV0Env(DirectRLEnv):
# logging.debug(f"orientations: {self.orientations[env_ids]}")
# logging.debug(f"orientations: {self.orientations[env_ids]}")
logging . debug ( f " euler_angles: { euler_xyz_from_quat ( self . orientations [ env_ids ] ) } " )
logging . debug ( f " euler_angles: { euler_xyz_from_quat ( self . orientations [ env_ids ] ) } " )
@torch.jit.script
@torch.jit.script
def compute_rewards (
def compute_rewards (
root_ang_acc : torch . Tensor , # 根部角加速度 [num_envs, 3] (roll, pitch, yaw)
# 输入参数
last_root_ang_vel : torch . Tensor , # 上一帧的根部角速度 [num_envs, 3]
dt : float ,
dt : float , # 时间步长
cmd_lin_vel : torch . Tensor ,
rew_scale_smoothness : float , # 平滑性奖励系数
cmd_ang_vel : torch . Tensor ,
rew_scal e_a live: float , # 存活奖励系数
bas e_lin_ vel : torch . Tensor ,
rew_scale_velocity : float , # 速度奖励系数
base_ang_vel : torch . Tensor ,
base_l in_vel: torch . Tensor , # 基础线速度 [num_envs, 3]
jo int _vel: torch . Tensor ,
target_velocity : torch . Tensor , # 目标速度 [num_envs, 1]
joint_torque : torch . Tensor ,
terminated : torch . Tensor # 终止标志 [num_envs]
actions : torch . Tensor ,
) - > torch . Tensor :
last_actions : torch . Tensor ,
"""
# 奖励参数
计算包含平顺性奖励的总奖励
rew_scale_lin_vel : float ,
rew_scale_ang_vel : float ,
参数:
rew_scale_z : float ,
root_ang_acc: 当前角加速度 (通过当前角速度与上一帧角速度计算得到)
rew_scale_orientation : float ,
last_root_ang_vel: 上一帧的角速度
rew_scale_joint_motion : float ,
dt: 时间步长
rew_scale_joint_torque : float ,
rew_scale_smoothness: 平滑性奖励权重
rew_scale_action_rate : float ,
rew_scale_alive: 存活奖励权重
) - > torch . Tensor :
rew_scale_velocity: 速度跟踪奖励权重
# 线速度/角速度跟踪(计算两个向量之间的欧几里得距离)
base_lin_vel: 当前线速度
sigma_squared = 0.25
target_velocity: 目标前进速度
# 线速度部分
terminated: 是否终止的标志
# 提取 xy 方向的速度
v_target_xy = cmd_lin_vel [ : , : 2 ] # [num_envs, 2]
返回:
v_actual_xy = base_lin_vel [ : , : 2 ] # [num_envs, 2]
总奖励值
# 计算偏差的范数平方
"""
v_diff_xy = v_target_xy - v_actual_xy
# 计算平顺性奖励 - 惩罚pitch和roll的剧烈变化
v_diff_norm_squared = torch . sum ( v_diff_xy * * 2 , dim = 1 , keepdim = True )
# 只取pitch和roll的加速度 (忽略yaw)
# 计算线速度跟踪的奖励
pitch_roll_acc = root_ang_acc [ : , : 2 ] # [num_envs, 2 ]
linear_error = torch . exp ( - v_diff_norm_squared / sigma_squared ) # [num_envs, 1 ]
smoothness_penalty = torch . sum ( torch . square ( pitch_roll_acc ) , dim = 1 ) # [num_envs]
# 角速度部分
smoothness_reward = - rew_scale_smoothness * smoothness_penalty * dt
omega_target_z = cmd_ang_vel [ : , 2 ] . unsqueeze ( 1 ) # [num_envs, 1]
# 速度跟踪奖励 (前进方向x轴)
omega_actual_z = base_ang_vel [ : , 2 ] . unsqueeze ( 1 ) # [num_envs, 1]
velocity_error = torch . abs ( base_lin_vel [ : , 0 ] - target_velocity [ : , 0 ] ) # [num_envs]
# 计算偏差
velocity_reward = - rew_scale_velocity * velocity_error
omega_diff_z = omega_target_z - omega_actual_z # [num_envs, 1]
# 存活奖励
omega_diff_squared = omega_diff_z . pow ( 2 ) # [num_envs, 1]
alive_reward = rew_scale_alive * ( ~ terminated ) . float ( )
# 计算角速度跟踪的奖励
# 总奖励
angular_error = torch . exp ( - omega_diff_squared / sigma_squared ) # [num_envs, 1]
# total_reward = alive_reward + velocity_reward + smoothness_reward
# total_reward = alive_reward + smoothness_reward
# total_reward = velocity_reward
total_reward = smoothness_reward
return total_reward
tracking_reward = linear_error * rew_scale_lin_vel * 1.0 * dt + angular_error * rew_scale_ang_vel * 0.5 * dt
# # 调试打印张量大小
# print(f"linear_error: {linear_error.shape}, angular_error: {angular_error.shape}, tracking_reward: {tracking_reward.shape}")
# z 轴平稳性 抑制z轴跳动
v_z = base_ang_vel [ : , 2 : 3 ] # 提取 z 轴分量,保持维度为 [num_envs, 1]
v_z_squared = v_z . pow ( 2 ) # 计算平方
z_reward = - 1.0 * v_z_squared * rew_scale_z * 4.0 * dt
# # 调试打印张量大小
# print(f"v_z: {v_z.shape}, v_z_squared: {v_z_squared.shape}, z_reward: {z_reward.shape}")
# 角速度平稳性 抑制xy
omega_xy = base_ang_vel [ : , : 2 ] # shape: [num_envs, 2]
omega_xy_norm_squared = torch . sum ( omega_xy . pow ( 2 ) , dim = 1 , keepdim = True ) # shape: [num_envs, 1]
omega_xy_reward = - 1.0 * omega_xy_norm_squared * rew_scale_orientation * 0.05 * dt
# # 调试打印张量大小
# print(f"omega_xy: {omega_xy.shape}, omega_xy_norm_squared: {omega_xy_norm_squared.shape}, omega_xy_reward: {omega_xy_reward.shape}")
# # 行动率奖励
# action_rate = torch.norm(actions - last_actions, dim=1, keepdim=True) # shape: [num_envs, 1]
# action_rate_reward = -1.0 * action_rate * rew_scale_action_rate
total_reward = tracking_reward + z_reward + omega_xy_reward
# # 调试打印张量大小
# print(f"tracking_reward: {tracking_reward.shape}, z_reward: {z_reward.shape}, omega_xy_reward: {omega_xy_reward.shape}, total_reward: {total_reward.shape}")
return total_reward