Joint Trajectory File Playback - Code Walkthrough

From sdk-wiki
Jump to: navigation, search

Introduction

This example demonstrates the usage of the Joint Trajectory Action Server to command raw joint position commands. The main() creates and instance of the Trajectory class and calls the parse_file() method. This method is responsible for parsing the input file and packing them as Request message for the Joint Trajectory Action server. The start() method is then called, which sends the Request messages to the action server. Finally, wait() is then called, which verifies if the trajectory execution was successful.< /br> Action Servers -robot/limb/left/follow_joint_trajectory, robot/limb/right/follow_joint_trajectory.

Code Walkthrough

Now, let's break down the code.

  1. import argparse
  2. import operator
  3. import sys
  4.  
  5. from bisect import bisect
  6. from copy import copy
  7. from os import path
  8.  
  9. import rospy
  10.  
  11. import actionlib
  12.  
  13. from control_msgs.msg import (
  14.     FollowJointTrajectoryAction,
  15.     FollowJointTrajectoryGoal,
  16. )
  17. from trajectory_msgs.msg import (
  18.     JointTrajectoryPoint,
  19. )
  20.  
  21. import baxter_interface
  22.  
  23. from baxter_interface import CHECK_VERSION

This imports the baxter interface for accessing the limb and the gripper class. The actionlib is imported to use its Action Server class.The CHECK_VERSION is imported to check if the software running on the robot would be compatible with this local version. It is not necessary to check the version in custom programs.

  1. class Trajectory(object):
  2.     def __init__(self):
  3.         #create our action server clients
  4.         self._left_client = actionlib.SimpleActionClient(
  5.             'robot/limb/left/follow_joint_trajectory',
  6.             FollowJointTrajectoryAction,
  7.         )
  8.         self._right_client = actionlib.SimpleActionClient(
  9.             'robot/limb/right/follow_joint_trajectory',
  10.             FollowJointTrajectoryAction,
  11.         )
  12.  
  13.         #verify joint trajectory action servers are available
  14.         l_server_up = self._left_client.wait_for_server(rospy.Duration(10.0))
  15.         r_server_up = self._right_client.wait_for_server(rospy.Duration(10.0))
  16.         if not l_server_up or not r_server_up:
  17.             msg = ("Action server not available."
  18.                    " Verify action server availability.")
  19.             rospy.logerr(msg)
  20.             rospy.signal_shutdown(msg)
  21.             sys.exit(1)

Action server clients for the left and right limbs are created. The existence of action servers for both the limbs are checked with a timeout of 10 seconds. If they are not available, a shutdown signal is sent and the program exits.

  1.         #create our goal request
  2.         self._l_goal = FollowJointTrajectoryGoal()
  3.         self._r_goal = FollowJointTrajectoryGoal()
  4.  
  5.         #limb interface - current angles needed for start move
  6.         self._l_arm = baxter_interface.Limb('left')
  7.         self._r_arm = baxter_interface.Limb('right')
  8.  
  9.         #gripper interface - for gripper command playback
  10.         self._l_gripper = baxter_interface.Gripper('left', CHECK_VERSION)
  11.         self._r_gripper = baxter_interface.Gripper('right', CHECK_VERSION)

The request message that will hold the goal positions for both the limbs' action servers are created. The current joint and gripper positions are also captured.

  1.         if self._l_gripper.error():
  2.             self._l_gripper.reset()
  3.         if self._r_gripper.error():
  4.             self._r_gripper.reset()
  5.         if (not self._l_gripper.calibrated() and
  6.             self._l_gripper.type() != 'custom'):
  7.             self._l_gripper.calibrate()
  8.         if (not self._r_gripper.calibrated() and
  9.             self._r_gripper.type() != 'custom'):
  10.             self._r_gripper.calibrate()

The error() method returns whether or not the gripper is in an error state. The possible cause of errors might be over/under-voltage, over/under-current, motor faults, etc. The calibrated() method returns a boolean, true if the gripper is already calibrated, while calibrate() performs the actual calibration of the grippers. type() returns the type of the gripper (custom, electric, etc). The calibration check is skipped if the gripper is a custom gripper.

  1.         #gripper goal trajectories
  2.         self._l_grip = FollowJointTrajectoryGoal()
  3.         self._r_grip = FollowJointTrajectoryGoal()
  4.  
  5.         #param namespace
  6.         self._param_ns = '/rsdk_joint_trajectory_action_server/'
  7.  
  8.         #gripper control rate
  9.         self._gripper_rate = 20.0  # Hz

The request message that will hold the goal positions for both the grippers are created. The namespace is modified and the grippers' control rates are modified.

  1.     def _execute_gripper_commands(self):
  2.         start_time = rospy.get_time()
  3.         r_cmd = self._r_grip.trajectory.points
  4.         l_cmd = self._l_grip.trajectory.points
  5.         pnt_times = [pnt.time_from_start.to_sec() for pnt in r_cmd]
  6.         end_time = pnt_times[-1]
  7.         rate = rospy.Rate(self._gripper_rate)
  8.         now_from_start = rospy.get_time() - start_time

The r_cmd and l_cmd variables holds the grippers' positions that were parsed from the file. The corresponding time stamps are read into pnt_times variable.

  1.         while(now_from_start < end_time + (1.0 / self._gripper_rate) and
  2.               not rospy.is_shutdown()):
  3.             idx = bisect(pnt_times, now_from_start) - 1
  4.             if self._r_gripper.type() != 'custom':
  5.                 self._r_gripper.command_position(r_cmd[idx].positions[0])
  6.             if self._l_gripper.type() != 'custom':
  7.                 self._l_gripper.command_position(l_cmd[idx].positions[0])
  8.             rate.sleep()
  9.             now_from_start = rospy.get_time() - start_time

idx holds the index of the timestamp from the file, relative to the current one. It is important to note that the velocities at which the Joint Positions were traversed are preserved during the playback. The corresponding grippers' positions are then commanded to the action server.

  1.     def _clean_line(self, line, joint_names):
  2.         """
  3.        Cleans a single line of recorded joint positions
  4.  
  5.        @param line: the line described in a list to process
  6.        @param joint_names: joint name keys
  7.  
  8.        @return command: returns dictionary {joint: value} of valid commands
  9.        @return line: returns list of current line values stripped of commas
  10.        """
  11.         def try_float(x):
  12.             try:
  13.                 return float(x)
  14.             except ValueError:
  15.                 return None
  16.         #convert the line of strings to a float or None
  17.         line = [try_float(x) for x in line.rstrip().split(',')]
  18.         #zip the values with the joint names
  19.         combined = zip(joint_names[1:], line[1:])
  20.         #take out any tuples that have a none value
  21.         cleaned = [x for x in combined if x[1] is not None]
  22.         #convert it to a dictionary with only valid commands
  23.         command = dict(cleaned)
  24.         return (command, line,)

This try_float() function replaces the contents of the variable passed with float and none values. Then, a tuple is constructed by zipping the names passed with the values from the variable line. This may contain valid values as well as none values . After removing the none values, a dictionary is constructed. This dictionary is then parsed to create valid Joint commands.

  1.     def _add_point(self, positions, side, time):
  2.         """
  3.        Appends trajectory with new point
  4.  
  5.        @param positions: joint positions
  6.        @param side: limb to command point
  7.        @param time: time from start for point in seconds
  8.        """
  9.         #creates a point in trajectory with time_from_start and positions
  10.         point = JointTrajectoryPoint()
  11.         point.positions = copy(positions)
  12.         point.time_from_start = rospy.Duration(time)
  13.         if side == 'left':
  14.             self._l_goal.trajectory.points.append(point)
  15.         elif side == 'right':
  16.             self._r_goal.trajectory.points.append(point)
  17.         elif side == 'left_gripper':
  18.             self._l_grip.trajectory.points.append(point)
  19.         elif side == 'right_gripper':
  20.             self._r_grip.trajectory.points.append(point)

This method creates a request message of the JointTrajectoryPoint type and appends the goal position based on the side of the limb/gripper that is requesting. It compiles a list of Joint Positions as the trajectory.

  1.     def parse_file(self, filename):
  2.         """
  3.        Parses input file into FollowJointTrajectoryGoal format
  4.  
  5.        @param filename: input filename
  6.        """
  7.         #open recorded file
  8.         with open(filename, 'r') as f:
  9.             lines = f.readlines()
  10.         #read joint names specified in file
  11.         joint_names = lines[0].rstrip().split(',')
  12.         #parse joint names for the left and right limbs
  13.         for name in joint_names:
  14.             if 'left' == name[:-3]:
  15.                 self._l_goal.trajectory.joint_names.append(name)
  16.             elif 'right' == name[:-3]:
  17.                 self._r_goal.trajectory.joint_names.append(name)

The lines are split into a list with ',' as the delimiter to extract the joint names. The arm of the current joint, found by stripping the "_<joint>" (last three characters) from the joint_name, is checked and stored.

  1.         def find_start_offset(pos):
  2.             #create empty lists
  3.             cur = []
  4.             cmd = []
  5.             dflt_vel = []
  6.             vel_param = self._param_ns + "%s_default_velocity"
  7.             #for all joints find our current and first commanded position
  8.             #reading default velocities from the parameter server if specified
  9.             for name in joint_names:
  10.                 if 'left' == name[:-3]:
  11.                     cmd.append(pos[name])
  12.                     cur.append(self._l_arm.joint_angle(name))
  13.                     prm = rospy.get_param(vel_param % name, 0.25)
  14.                     dflt_vel.append(prm)
  15.                 elif 'right' == name[:-3]:
  16.                     cmd.append(pos[name])
  17.                     cur.append(self._r_arm.joint_angle(name))
  18.                     prm = rospy.get_param(vel_param % name, 0.25)
  19.                     dflt_vel.append(prm)
  20.             diffs = map(operator.sub, cmd, cur)
  21.             diffs = map(operator.abs, diffs)
  22.             #determine the largest time offset necessary across all joints
  23.             offset = max(map(operator.div, diffs, dflt_vel))
  24.             return offset

The first commanded position and the current position for all the joints are captured. The default values that were loaded into the param server are read. The largest time offset necessary across all the joints is calculated.

  1.         for idx, values in enumerate(lines[1:]):
  2.             #clean each line of file
  3.             cmd, values = self._clean_line(values, joint_names)
  4.             #find allowable time offset for move to start position
  5.             if idx == 0:
  6.                 start_offset = find_start_offset(cmd)
  7.             #add a point for this set of commands with recorded time
  8.             cur_cmd = [cmd[jnt] for jnt in self._l_goal.trajectory.joint_names]
  9.             self._add_point(cur_cmd, 'left', values[0] + start_offset)
  10.             cur_cmd = [cmd[jnt] for jnt in self._r_goal.trajectory.joint_names]
  11.             self._add_point(cur_cmd, 'right', values[0] + start_offset)
  12.             cur_cmd = [cmd['left_gripper']]
  13.             self._add_point(cur_cmd, 'left_gripper', values[0] + start_offset)
  14.             cur_cmd = [cmd['right_gripper']]
  15.             self._add_point(cur_cmd, 'right_gripper', values[0] + start_offset)

The non-float values are cleaned and a dictionary of valid Joint Commands is returned in the _clean_line() function as explained above. The time offset for move to start position is also calculated. The parsed set of recorded commands along with their recorded time is added to the goal list.

  1.     def start(self):
  2.         """
  3.        Sends FollowJointTrajectoryAction request
  4.        """
  5.         self._left_client.send_goal(self._l_goal)
  6.         self._right_client.send_goal(self._r_goal)
  7.         self._execute_gripper_commands()

This function sends the FollowJointTrajectoryAction request message which includes the recorded positions and their corresponding time, to the Joint Trajectory Action Server.

  1.     def stop(self):
  2.         """
  3.        Preempts trajectory execution by sending cancel goals
  4.        """
  5.         if (self._left_client.gh is not None and
  6.             self._left_client.get_state() == actionlib.GoalStatus.ACTIVE):
  7.             self._left_client.cancel_goal()
  8.  
  9.         if (self._right_client.gh is not None and
  10.             self._right_client.get_state() == actionlib.GoalStatus.ACTIVE):
  11.             self._right_client.cancel_goal()
  12.  
  13.         #delay to allow for terminating handshake
  14.         rospy.sleep(0.1)

The Trajectory execution is stopped by sending cancel goals to the Joint trajectory action server.

  1.     def wait(self):
  2.         """
  3.        Waits for and verifies trajectory execution result
  4.        """
  5.         #create a timeout for our trajectory execution
  6.         #total time trajectory expected for trajectory execution plus a buffer
  7.         last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec()
  8.         time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
  9.         timeout = rospy.Duration(last_time + time_buffer)
  10.  
  11.         l_finish = self._left_client.wait_for_result(timeout)
  12.         r_finish = self._right_client.wait_for_result(timeout)
  13.         l_result = (self._left_client.get_result().error_code == 0)
  14.         r_result = (self._right_client.get_result().error_code == 0)
  15.  
  16.         #verify result
  17.         if all([l_finish, r_finish, l_result, r_result]):
  18.             return True
  19.         else:
  20.             msg = ("Trajectory action failed or did not finish before "
  21.                    "timeout/interrupt.")
  22.             rospy.logwarn(msg)
  23.             return False

This method waits for the completion of the trajectory execution by Joint trajectory action server<code>.

  1. def main():
  2.     """RSDK Joint Trajectory Example: File Playback
  3.  
  4.    Plays back joint positions honoring timestamps recorded
  5.    via the joint_recorder example.
  6.  
  7.    Run the joint_recorder.py example first to create a recording
  8.    file for use with this example. Then make sure to start the
  9.    joint_trajectory_action_server before running this example.
  10.  
  11.    This example will use the joint trajectory action server
  12.    with velocity control to follow the positions and times of
  13.    the recorded motion, accurately replicating movement speed
  14.    necessary to hit each trajectory point on time.
  15.    """
  16.     epilog = """
  17. Related examples:
  18.  joint_recorder.py; joint_position_file_playback.py.
  19.    """
  20.     arg_fmt = argparse.RawDescriptionHelpFormatter
  21.     parser = argparse.ArgumentParser(formatter_class=arg_fmt,
  22.                                      description=main.__doc__,
  23.                                      epilog=epilog)
  24.     parser.add_argument(
  25.         '-f', '--file', metavar='PATH', required=True,
  26.         help='path to input file'
  27.     )
  28.     parser.add_argument(
  29.         '-l', '--loops', type=int, default=1,
  30.         help='number of playback loops. 0=infinite.'
  31.     )
  32.     # remove ROS args and filename (sys.arv[0]) for argparse
  33.     args = parser.parse_args(rospy.myargv()[1:])

The name source file to read the Joint Position is captured along with the number of times the trajectory has to be looped from the command line.

  1.     print("Initializing node... ")
  2.     rospy.init_node("rsdk_joint_trajectory_file_playback")
  3.     print("Getting robot state... ")
  4.     rs = baxter_interface.RobotEnable(CHECK_VERSION)
  5.     print("Enabling robot... ")
  6.     rs.enable()
  7.     print("Running. Ctrl-c to quit")
  8.  
  9.     traj = Trajectory()
  10.     traj.parse_file(path.expanduser(args.file))
  11.     #for safe interrupt handling
  12.     rospy.on_shutdown(traj.stop)
  13.     result = True
  14.     loop_cnt = 1
  15.     loopstr = str(args.loops)
  16.     if args.loops == 0:
  17.         args.loops = float('inf')
  18.         loopstr = "forever"
  19.     while (result == True and loop_cnt <= args.loops
  20.            and not rospy.is_shutdown()):
  21.         print("Playback loop %d of %s" % (loop_cnt, loopstr,))
  22.         traj.start()
  23.         result = traj.wait()
  24.         loop_cnt = loop_cnt + 1
  25.     print("Exiting - File Playback Complete")
  26.  
  27. if __name__ == "__main__":
  28.     main()

The node is initialized. An instance of the <code>Trajectory class is created. The parse_file() method is called to extract the recorded Joint Positions and their corresponding timestamps as explained above. The start() method is called to send the FollowJointTrajectory request message to the Joint Trajectory Action server. This loops for the user specified number of times.