Demo Mode - Code Walkthrough

From sdk-wiki
Jump to: navigation, search

Demo Mode - Code Walkthrough

The Demo mode represents a simple way of generating and displaying an interactive UI on the Baxter Research robot.

The core of this program is an Image Processing library that converts between PNG, PIL (Python Imaging Library), openCV, and ROS Message formats for images. Beyond that is a simple UI structure, with Window and Button classes that uses the aforementioned library, in co-ordination with ros messages from the robot, to generate and display the User Interface.

UI Documentation Diagram.png

Image Processing - Displaying the pretty pictures

Let's start with the imports

  1. import cv
  2. import cv_bridge
  3. import PIL

PIL is used for loading and integrating PNG asset images. cv and cv_bridge are used for image manipulation and conversion to ros messages.

XDisplay - Put it on the Big Screen

As with other functions interacting with Baxter's face, we will need to publish an Image message to the /robot/xdisplay topic to update the screen image.

To that end, our UI Controller, the highest element in the UI system, has a ui.xdisp property containing a publisher to that topic:

  1.         self.xdisp = rospy.Publisher('/robot/xdisplay', ImageMsg, latch=True)

Short section, but it had to be done before we go on to the rest of this...

PIL to ROS - From humble beginnings

Let's start by covering taking a .png image asset and converting it into a Ros Message

The places we do this in the code are pretty spread out, so, for a simple case, let's load a single .png file, and

from PIL import Image

from baxter_demo_ui import img_proc

#image_path = "path/to/your/image.png"
img =

Now, here is the code for gen_cv (in, which converts a single image to from PIL to an openCV image format

  1. def gen_cv(img):
  2.     return PIL_to_cv(rgb_to_bgr(img))
  4. def rgb_to_bgr(img):
  5.     r, g, b = img.split()
  6.     return PIL.Image.merge('RGB', (b, g, r))
  8. def PIL_to_cv(img):
  9.     ci = cv.CreateImage((1024, 600), cv.IPL_DEPTH_8U, 3)
  10.     cv.SetData(ci, img.tostring(), 3072)
  11.     return ci

To start, gen_cv takes a PIL image, and converts if from RGB to BGR, the accepted encoding format for publishing to xdisplay. We then create an empty cv_mat object and copy the data from the BGR image into that object.

To convert this from a cv_mat image into a ros message, we then have to use the cv_bridge function cv_to_imgmsg:

  1. def cv_to_msg(img):
  2.     return cv_bridge.CvBridge().cv_to_imgmsg(img, encoding='bgr8')

With this code, you can take any number of canned images and push them to the screen in your scripts, but this by itself is not terribly useful or modular.

Anatomy of a Button - Layering images and text in PIL

In, we generate images for the different buttons available in the UI by combining icons, with an inherent alpha layer, with different button state images and drawing text on top of them.

We generate the images with our gen_image function

  1. def gen_image(self, btn, icon, offset):
  2.     img = btn.resize(self._size, Image.ANTIALIAS)
  3.     if icon != None:
  4.         img.paste(icon, tuple(offset), icon)
  5.     return img

What this is doing is resizing the button itself, which is a PIL Image object loaded from a .png, to the appropriate size listed in the config, and layering the icon, another PIL image, on top of it at the given offset. The reason we call tuple here is just that we are drawing the offset from a json file and it comes in as a list rather than a tuple. The addition of the second icon term specifies the alpha mask for the image. If the image contains its own alpha layer, then you can simply use the image as its own mask.

To cover the addition of text, we need to look into for a moment.

We start by importing a few new items from PIL:

  1. def gen_image(self, btn, icon, offset):
  2. from PIL import (
  3.   Image,
  4.   ImageDraw,
  5.   ImageFont
  6. )

Then, when drawing the buttons for our window, we create a draw() instance using ImageDraw.draw()and pass this through to the button's draw function. This is what we will use to layer the text on the image.

  1.         tmp = copy(self._bg['img'])
  2.         d = ImageDraw.draw(tmp)
  3.         for name, btn in self._buttons.items():
  4.             if == sel_btn:
  5.                 if disabled:
  6.                     state = 'pressed'
  7.                 else:
  8.                     state = 'selected'
  9.             else:
  10.                 if disabled:
  11.                     state = 'disabled'
  12.                 else:
  13.                     state = 'idle'
  14.             btn.draw(tmp, d, state)

Now, in the initialization of the Button, we denote a font parameter, derived from the .ttf for that font. (share_path is a passed argument that just points to the share/ directory of this package)

  1.         self._font = ImageFont.truetype('%s/Arial.ttf' %
  2.                                            share_path, 30)

Finally, in the draw method of the Button class:

  1.     def draw(self, img, draw, state):
  2.         tmp = self._imgs[state]
  3.         img.paste(tmp, self._offset, tmp)
  4.         self.draw_label(draw)
  1.     def draw_label(self, draw):
  2.         label_x = (self._offset[0] + self._size[0] / 2 -
  3.                    draw.textsize(self._label, self._font)[0] / 2)
  4.         label_y = self._offset[1] + self._label_y
  5.         draw.text((label_x, label_y), self._label,
  6.                   fill='white', font=self._font)

Here, we grab the stored button image (with frame and icon) associated with the provided state, and paste it onto the window image, before calling draw_label. In draw_label, we simply derive x and y locations for the label and call the ImageDraw.draw object's text() function. (Note that self._label is the actual label text)

Anatomy of a Window - Handling of states

The function of Windows in this UI is to handle the State of the interface. The UI state is stored hierarchically, where the UI controller has a selected_window, which has a selected_button, and a number of Parents.

The Parents are used for 2 things:

  • When a user pressed "Back" on a window, it will attempt to set its parent as the new active window and re-draw.
  • When a window is drawn, it calls the draw() function for all of its parents recursively, and has them draw their "disabled" state to show that they are not the focus, before overlaying its current image to that base.
  1.     def _draw_window(self, img, window, selected=True):
  2.         if[window].parent:
  3.             img = self._draw_window(img,
  4.                          [window].parent,
  5.                                    selected=False)
  6.         return[window].draw(img, selected)

Selected, here, refers to whether or not the window being drawn is the current active window.

The main visual functionality of the UI comes from changing which window is drawn, in what state.

Interaction - Controlling the pretty pictures through use of buttons and a knob

The Demo Mode UI responds to the navigators on both arms, and incorporates the gripper_cuff example, which enables interaction with the cuff and cuff buttons.

We use the baxter_interface and baxter_dataflow signals to connect our UI interaction functions with on_change signals for the Buttons and wheel on the arm navigators.

  1.         self._navigators = {'left': Navigator('left'),
  2.                            'right': Navigator('right')}
  4.         self._listeners_connected = False
  5.         self._connect_listeners()

  1.     def _connect_listeners(self):
  2.         # Navigator OK Button
  3.         self._navigators['left'].button0_changed.connect(self._left_ok_pressed)
  4.         self._navigators['right'].button0_changed.connect(
  5.             self._right_ok_pressed)
  7.         # Navigator Wheel
  8.         self._navigators['left'].wheel_changed.connect(self._left_wheel_moved)
  9.         self._navigators['right'].wheel_changed.connect(
  10.             self._right_wheel_moved)
  12.         # Navigator Baxter Button
  13.         self._navigators['left'].button2_changed.connect(self._enable)
  14.         self._navigators['right'].button2_changed.connect(self._enable)
  16.         # Navigator Back Button
  17.         self._navigators['left'].button1_changed.connect(self.back)
  18.         self._navigators['right'].button1_changed.connect(self.back)
  20.         self._listeners_connected = True

So now, the buttons and wheel for each arm are attached to these UI functions.

The listener functions for buttons are very simple, and pretty much react when the button state changes to True

  1.     def back(self, v):
  2.         if v == True:
  3.             self.error_state = False
  4.             if self.active_window.parent:
  5.                 self.kill_examples()
  6.                 self.set_active_window(self.active_window.parent)
  7.                 self.draw()

The listener for the navigators is similar, but it responds regardless of what it is changed to, incrementing or decrementing the scroll in the UI depending on the signal received.

  1.     def _left_wheel_moved(self, v):
  2.         self._wheel_moved(v, 'left')
  4.     def _right_wheel_moved(self, v):
  5.         self._wheel_moved(v, 'right')
  7.     def _wheel_moved(self, v, side):
  8.         if not self._active_example and self._wheel_ok:
  9.             if v > 0:
  10.                 self.scroll(1)
  11.             else:
  12.                 self.scroll(-1)
  13.             self._wheel_ok = False
  14.             rospy.Timer(rospy.Duration(.01), self._set_wheel_ok, oneshot=True)
  16.     def scroll(self, direction):
  17.         self.active_window.scroll(direction)
  18.         self.draw()

Important note - Enable Loop and Avoiding E-Stop Craziness

When you are in the demo mode, we assume that you want the robot to be enabled, so we have a loop in demo_ui that keeps the robot enabled, and reacts to robot state changes.

  1.         rospy.Timer(rospy.Duration(.5), self._enable)

The "_enable" function will attempt to enable the robot, and will throw an error screen up if it fails.

  1.     def _enable(self, v=1):
  2.         if v == 1 and not self._status.state().enabled:
  3.             try:
  4.                 self._status.enable()
  5.             except:
  6.                 self.error_screen('stopped')
  7.                 return False
  8.             if not self._status.state().enabled:
  9.                 self.error_screen('no_enable')
  10.                 return False
  11.         self._enable_cuff()
  12.         return True

Unfortunately, when the robot is E-Stopped, messages from the sensors will be all messed up, and therefore the listeners we set up for the scroll wheels and buttons will send bad data, and throw errors when they try to call the callback functions.

To fix this issue, we set up another listener to the Estop button, which will disconnect and reconnect the listeners.

  1.         self._estop_sub = rospy.Subscriber('robot/state', AssemblyState,
  2.                                            self._estop_callback)
  1.     def _estop_callback(self, msg):
  2.         if self._estop_state != msg.stopped:
  3.             self._estop_state = msg.stopped
  4.             if msg.stopped and self._listeners_connected:
  5.                 self._disconnect_listeners()
  6.             elif not msg.stopped and not self._listeners_connected:
  7.                 self._connect_listeners()
  1.     def _disconnect_listeners(self):
  2.         # Navigator OK Button
  3.         self._navigators['left'].button0_changed.disconnect(
  4.             self._left_ok_pressed)
  5.         self._navigators['right'].button0_changed.disconnect(
  6.             self._right_ok_pressed)
  8.         # Navigator Wheel
  9.         self._navigators['left'].wheel_changed.disconnect(
  10.             self._left_wheel_moved)
  11.         self._navigators['right'].wheel_changed.disconnect(
  12.             self._right_wheel_moved)
  14.         # Navigator Baxter Button
  15.         self._navigators['left'].button2_changed.disconnect(self._enable)
  16.         self._navigators['right'].button2_changed.disconnect(self._enable)
  18.         # Navigator Back Button
  19.         self._navigators['left'].button1_changed.disconnect(self.back)
  20.         self._navigators['right'].button1_changed.disconnect(self.back)
  22.         self._listeners_connected = False

Demo Functions - The Stuff We Do

We use a couple of nifty tricks to get the robot to run through our functions when buttons are pressed. Here I'm going to cover a couple of these nifty tricks.

Processes - Make It Do What It Do, CLI-Style

There is a class defined in baxter_procs called RosProcess that will allow you to spawn off a command-line process using subprocess.Popen. This is how we run most of the example programs.

  1. class RosProcess():
  2.     def __init__(self, command, quiet=True, get_output=False, shell=True):
  3.         if shell == True:
  4.             self.process = Popen(command, shell=True, stdout=None,
  5.                                         stdin=PIPE, stderr=STDOUT)
  6.         else:
  7.             self.process = Popen(command, shell=False, stdout=None,
  8.                                         stdin=PIPE, stderr=STDOUT)
  9.         self.quiet = quiet
  10.         self.get_output = get_output
  12.     def write(self, stdin):
  13.         self.process.stdin.write(stdin)
  15.     def run(self):
  16.         stdout, stderr = self.process.communicate()
  17.         if self.quiet == False:
  18.             print stdout
  19.         if self.get_output:
  20.             return stdout
  21.         else:
  22.             return self.process.returncode

This is used in demo_functions to run most of the example programs (such as the arm_puppet)

  1. def puppet(ui, side):
  2.     proc = RosProcess('rosrun baxter_examples '
  3.                        ' -l %s' % side)

Camera Overlay - Live on Baxter's Face!

The other nifty thing we do here is, in the camera example, we overlay the output from the cameras onto the current window image.

  1. def camera_disp(ui, cam_side):
  2.     def _display(camera, name):
  3.         for cam in ui.cameras:
  4.             ui.cameras[cam].close()
  5.         camera.resolution = (640, 400)
  8.     def _cam_to_screen(msg):
  9.         newMsg = overlay(ui.img, msg, (1024, 600), (205, 140, 640, 400))
  10.         ui.xdisp.publish(newMsg)
  12.     ui.cam_sub = rospy.Subscriber(
  13.         'cameras/%s_camera/image' % cam_side,
  14.         ImageMsg,
  15.         _cam_to_screen)
  17.     camera = ui.cameras[cam_side]
  18.     _display(camera, '%s_camera' % cam_side)

Each "cam" in ui.cameras is a CameraController from baxter_interface, corresponding to one of the physical cameras on baxter. This function will close all of the cameras when called in order to ensure that the one we want to view can be opened (as you cannot have all 3 cameras open simultaneously)

The way this function works is by linking a subscriber to the local function _cam_to_screen, which will take the UI's currently published image message and overlay the the current camera image on top of it in a given bounding-box.

The overlay() function lives in img_proc, and will convert the image message back to a cv_mat image, and copy the camera image on top of it.

  1. def overlay(old_img, new_img, original_size, new_rect):
  2.     tmp = cv.CreateImage(original_size, cv.IPL_DEPTH_8U, 3)
  3.     cv.Copy(old_img, tmp)
  4.     sub = cv.GetSubRect(tmp, new_rect)
  5.     cv_img = msg_to_cv(new_img)
  6.     cv.Copy(cv_img, sub)
  7.     return cv_to_msg(tmp)