Source code for smartbot_irl.data._data
from dataclasses import dataclass, fields, is_dataclass
from typing import Any, Dict, Optional
from ._type_maps import (
IMU,
ArucoMarkers,
Bool,
JointState,
LaserScan,
Odometry,
Pose,
PoseArray,
String,
)
[docs]
def list_sensor_columns() -> list[str]:
"""Return top-level keys from flatten()."""
s = SensorData.initialized()
return list(s.flatten().keys())
def flatten_generic(prefix: str, obj: Any) -> Dict[str, Any]:
"""
Flatten dataclasses and dicts.
Leave lists/tuples intact as a single object.
"""
flat = {}
if obj is None:
return flat
# Dataclass → expand fields
if is_dataclass(obj):
for f in fields(obj):
val = getattr(obj, f.name)
key = f'{prefix}_{f.name}'
flat.update(flatten_generic(key, val))
return flat
# Dict → expand key/value pairs
if isinstance(obj, dict):
for k, v in obj.items():
key = f'{prefix}_{k}'
flat.update(flatten_generic(key, v))
return flat
# Lists / tuples → DO NOT flatten, store as whole object
if isinstance(obj, (list, tuple)):
flat[prefix] = obj
return flat
# Primitive value
flat[prefix] = obj
return flat
[docs]
class SensorData:
"""
Container for all SmartBot sensor topics.
Each field starts as ``None`` and becomes a populated message
object (LaserScan, Odometry, etc.) once that topic is received.
"""
def __init__(self) -> None:
self.odom: Odometry = Odometry()
self.scan: LaserScan = LaserScan()
self.joints: JointState = JointState()
self.aruco_poses: PoseArray = PoseArray()
self.imu: IMU = IMU()
self.gripper_curr_state: String = String()
self.manipulator_curr_preset: String = String()
# self.seen_hexes: PoseArray = PoseArray()
self.seen_hexes: ArucoMarkers = ArucoMarkers()
self.seen_robots: PoseArray = PoseArray()
[docs]
@classmethod
def initialized(cls) -> 'SensorData':
"""
Return a new SensorData with all message objects constructed using
their default constructors. Useful for simulation environments
where we don't wait for ROS topics to populate.
"""
self = cls()
self.odom = Odometry()
self.scan = LaserScan()
self.joints = JointState()
self.aruco_poses = PoseArray()
self.imu = IMU()
self.gripper_curr_state = String()
self.manipulator_curr_preset = String()
self.seen_hexes = ArucoMarkers()
self.seen_robots = PoseArray()
return self
# ------------------------------------------------------------------
[docs]
def to_ros(self) -> dict:
"""Convert only populated fields to ROS-like dicts."""
d = {}
if self.odom:
d['odom'] = self.odom.to_ros()
if self.scan:
d['scan'] = self.scan.to_ros()
if self.joints:
d['joints'] = self.joints.to_ros()
if self.aruco_poses:
d['aruco_poses'] = self.aruco_poses.to_ros()
if self.imu:
d['imu'] = self.imu.__dict__
if self.gripper_curr_state:
d['gripper_curr_state'] = self.gripper_curr_state.to_ros()
if self.manipulator_curr_preset:
d['manipulator_curr_preset'] = self.manipulator_curr_preset.to_ros()
if self.seen_hexes:
d['seen_hexes'] = self.seen_hexes.to_ros()
if self.seen_robots:
d['seen_robots'] = self.seen_robots.to_ros()
return d
def __repr__(self):
keys = [k for k, v in vars(self).items() if v is not None]
missing = [k for k, v in vars(self).items() if v is None]
return f'SensorData(populated={keys}, missing={missing})'
[docs]
def flatten(self) -> dict:
"""Return a partially flattened dict of all sensor fields."""
out = {}
for name, value in vars(self).items():
out.update(flatten_generic(name, value))
return out
[docs]
@dataclass
class Command:
"""
High-level command for SmartBot.
Supports both:
- Differential drive control (linear + angular velocity)
- Direct wheel velocity control (left/right)
- Manipulator preset and gripper control
"""
wheel_vel_left: Optional[float] = None
wheel_vel_right: Optional[float] = None
linear_vel: Optional[float] = None
angular_vel: Optional[float] = None
gripper_closed: Optional[bool] = None
manipulator_presets: Optional[str] = None
# -------------------------------------------------------------
[docs]
def to_ros(self) -> dict:
"""
Convert this Command into a dictionary that rosliby can publish.
"""
msgs = {}
# velocity command (Twist)
if self.linear_vel is not None or self.angular_vel is not None:
msgs['geometry_msgs/Twist'] = {
'linear': {'x': self.linear_vel or 0.0, 'y': 0.0, 'z': 0.0},
'angular': {'x': 0.0, 'y': 0.0, 'z': self.angular_vel or 0.0},
}
# manipulator preset (String)
if self.manipulator_presets is not None:
msgs['std_msgs/String'] = {'data': str(self.manipulator_presets)}
# gripper state (Bool)
if self.gripper_closed is not None:
msgs['std_msgs/Bool'] = {'data': bool(self.gripper_closed)}
return msgs