Lesson 40: Control of external devices


[1]:
import time

import serial
import serial.tools.list_ports

import bokeh.layouts
import bokeh.models
import bokeh.io

notebook_url = "localhost:8888"
bokeh.io.output_notebook()
Loading BokehJS ...

In this lesson, we introduce control of external devices using a serial connection. The important package for this purpose is PySerial, which you can install if you haven’t already using

conda install pyserial

It enables communication with devices over a serial port, often a USB port. When you import it, you import it by the name serial.

Because this lesson connects to a physical device, it really must be run in a live Jupyter notebook with a device connected.

Arduino

Of course to do this tutorial, we should have an actual device with which to communicate. Therefore, I encourage you to actually build the device below and connect it to your computer and work through this lesson running a live Jupyter notebook.

For our device in this example, we will use an Arduino Uno board. These ≈$20 boards offer great functionality and are fun to just play with. Beyond, that, they are actually quite useful in research applications and have widespread use in research labs across disciplines, including in the biological sciences.

We will use our board with a simple circuit, shown below.

Arduino

At the left of the Arduino Uno board in the drawing is a USB-A port, which you should connect to your computer. The breadboard features a potentiometer, whose output is connected to analog input pin A0, and two LEDs that are controlled by digital pins 2 and 6. The LEDs are connected to ground via 220 Ω resistors. We will demonstrate how to communicate with Arduino via Python by reading voltages from pin A0 and by turning the LEDs on and off.

The Arduino board is loaded with compiled code written in a flavor of C++. The code for this circuit is shown below. The code basically tells Arduino which pins do what, and then listens for signals coming from its serial connection. Based on what signal it gets, it will either send the measurement of voltage out the serial connection or turn the LEDs on or off. (We will not go over how to code Arduino in this bootcamp, but rather focus on how to interface with an external device using Python.)

const int voltagePin = A0;

const int redLEDPin = 6;
const int greenLEDPin = 2;

const int HANDSHAKE = 0;
const int VOLTAGE_REQUEST = 1;
const int RED_LED_ON = 2;
const int RED_LED_OFF = 3;
const int GREEN_LED_ON = 4;
const int GREEN_LED_OFF = 5;

const int ON_REQUEST = 6;
const int STREAM = 7;
const int READ_DAQ_DELAY = 8;

String daqDelayStr;

int inByte = 0;
int daqMode = ON_REQUEST;
int daqDelay = 100;   // delay between acquisitions in milliseconds

int value;
unsigned long time_ms;


void printVoltage() {
  // read value from analog pin
  value = analogRead(voltagePin);
  time_ms = millis();

  // Write the result
  if (Serial.availableForWrite()) {
    String outstr = String(String(time_ms, DEC) + "," + String(value, DEC));
    Serial.println(outstr);
  }
}


void setup() {
  // Set LEDs to off
  pinMode(redLEDPin, OUTPUT);
  pinMode(greenLEDPin, OUTPUT);
  digitalWrite(redLEDPin, LOW);
  digitalWrite(greenLEDPin, LOW);

  // initialize serial communication
  Serial.begin(115200);
}


void loop() {
  // If we're auto-transferring data (streaming mode)
  if (daqMode == STREAM) {
    printVoltage();
    delay(daqDelay);
  }

  // Check if data has been sent to Arduino and respond accordingly
  if (Serial.available() > 0) {
    // Read in request
    inByte = Serial.read();

    // Handshake
    if (inByte == HANDSHAKE){
      if (Serial.availableForWrite()) {
          Serial.println("Handshake message received.");
      }
    }

    // If data is requested, fetch it and write it
    else if (inByte == VOLTAGE_REQUEST) printVoltage();

    // Switch daqMode
    else if (inByte == ON_REQUEST) daqMode = ON_REQUEST;
    else if (inByte == STREAM) daqMode = STREAM;

    // Read in DAQ delay
    else if (inByte == READ_DAQ_DELAY) {
      while (Serial.available() == 0) ;
      daqDelayStr = Serial.readStringUntil('x');
      daqDelay = daqDelayStr.toInt();
    }

    // else, turn LEDs on or off
    else if (inByte == RED_LED_ON) digitalWrite(redLEDPin, HIGH);
    else if (inByte == RED_LED_OFF) digitalWrite(redLEDPin, LOW);
    else if (inByte == GREEN_LED_ON) digitalWrite(greenLEDPin, HIGH);
    else if (inByte == GREEN_LED_OFF) digitalWrite(greenLEDPin, LOW);
  }
}

According to this code, if we send integers over the serial connection to Arduino, it will behave according to the table below.

signal to Arduino

action

0

Handshake (establish and check connection with Arduino)

1

Send voltage from port A0

2

Turn red LED on

3

Turn red LED off

4

Turn green LED on

5

Turn green LED off

6

Toggle data acquisition mode to on request

7

Toggle data acquisition mode to stream

8

Alert Arduino that the next input is streaming delay

For the rest of this lesson, we will assume you have an Arduino connected to your machine configured as in the photo above, with the above Arduino code loaded in.

For convenience, we can declare variables matching these integer codes for this notebook.

[2]:
HANDSHAKE = 0
VOLTAGE_REQUEST = 1
RED_LED_ON = 2
RED_LED_OFF = 3
GREEN_LED_ON = 4
GREEN_LED_OFF = 5
ON_REQUEST = 6
STREAM = 7
READ_DAQ_DELAY = 8

Arduino will send data back to us in one of two modes, either on demand or as a stream. For on-demand data, it waits until it gets a signal from Python asking for data and then sends the time stamp and the voltage measured at analog input A0. For streaming data, it automatically sends data with a specified delay between data points.

Finding the port

With your Arduino plugged into your computer via a USB connection, you first need to find out which port it is. The names of the ports will differ based on your operating system and when you plugged it in (your OS may assign the ports different names). You can get a list of ports using the serial.tools.list_ports.comports() function.

[3]:
ports = serial.tools.list_ports.comports()

# Take a look
ports
[3]:
[<serial.tools.list_ports_common.ListPortInfo at 0x7f8b4538ddf0>,
 <serial.tools.list_ports_common.ListPortInfo at 0x7f8b4538d220>]

On my machine, there are two ports open. We can look at the manufacturer of the devices attached to the ports to find Arduino.

[4]:
[port.manufacturer for port in ports]
[4]:
[None, 'Arduino (www.arduino.cc)']

Clearly, the second port is Arduino. We can get then get a string for the port associated with the device.

[5]:
ports[1].device
[5]:
'/dev/cu.usbmodem141101'

On Windows, the manufacturer might not appear as Arduino. In this case, you should take a look at the devices for each port.

[6]:
[port.device for port in ports]
[6]:
['/dev/cu.Bluetooth-Incoming-Port', '/dev/cu.usbmodem141101']

The appropriate port for Windows will be something like 'COM3'.

For convenience, we can write a function to find Arduino. Called without arguments, it will give the port for Arduino if the manufacturer comes up as Arduino in the query. Otherwise, you can provide a string (like 'COM7') for the port, and it will connect to that port.

[7]:
def find_arduino(port=None):
    """Get the name of the port that is connected to Arduino."""
    if port is None:
        ports = serial.tools.list_ports.comports()
        for p in ports:
            if p.manufacturer is not None and "Arduino" in p.manufacturer:
                port = p.device
    return port

We’ll use it to get the port for Arduino. (Note that you may need to explicitly give the port, especially if you are using Windows.)

[8]:
port = find_arduino()

Opening a connection

When opening a connection with Python, you cannot have the Serial Monitor nor Serial Plotter of the Arduino IDE open, since they will keep the port busy and Python cannot communicate with Arduino.

To open a connection to the device, we instantiate a serial.Serial instance. When a port is first opened, there is some handshaking between the device and the computer that needs to happen. To be safe, I always close the port and reopen it to get an open port, and then wait one second using time.sleep() before I send or receive data from it. I then send and receive data packets. The first input/output from Arduino Uno board is nonsense (unique to that board, I think). I then send and receive a handshake message again to make sure everything is working properly.

[9]:
# Open port
arduino = serial.Serial(port, baudrate=115200, timeout=1)


def handshake_arduino(
    arduino, sleep_time=1, print_handshake_message=False, handshake_code=0
):
    """Make sure connection is established by sending
    and receiving bytes."""
    # Close and reopen
    arduino.close()
    arduino.open()

    # Chill out while everything gets set
    time.sleep(sleep_time)

    # Set a long timeout to complete handshake
    timeout = arduino.timeout
    arduino.timeout = 2

    # Read and discard everything that may be in the input buffer
    _ = arduino.read_all()

    # Send request to Arduino
    arduino.write(bytes([handshake_code]))

    # Read in what Arduino sent
    handshake_message = arduino.read_until()

    # Send and receive request again
    arduino.write(bytes([handshake_code]))
    handshake_message = arduino.read_until()

    # Print the handshake message, if desired
    if print_handshake_message:
        print("Handshake message: " + handshake_message.decode())

    # Reset the timeout
    arduino.timeout = timeout


# Call the handshake function
handshake_arduino(arduino, print_handshake_message=True, handshake_code=HANDSHAKE)
Handshake message: Handshake message received.

A few comments about the above code, bearing in mind that the Python variable arduino is a serial.Serial instance.

  • The arduino.timeout attribute sets the maximum time in seconds to wait for serial communication.

  • To handshake, we need to send code 0 to the Arduino. It must be sent as bytes, machine numbers Arduino can understand. To convert an integer to bytes in Python, we use the built-in bytes() function. It accepts an iterable (like a list of tuple) of ints and converts them to bytes. So, the signal we would send for code 0 is bytes([0]).

  • The arduino.read_all() function reads all bytes that are in the input buffer on the Python side.

  • The arduino.read_until() function reads from the input buffer on the Python side until a newline character is encountered. This is convenient because the Python interpreter moves along way faster than Arduino can perform calculations and write data over USB. By asking Python to read until it hits a newline character, it has to wait until the complete message is sent by Arduino.

  • The message sent from Arduino is a bytesarray. To convert it to a string, use the decode() method, as we did with handshake_message.decode() in the above function.

The port is currently open, and I’m not going to anything with it now, so I am going to close it. This is very important: Make sure you close your serial connection when you are done with it.

[10]:
arduino.close()

When possible, it is good practice to instead use context management when opening a serial connection. That way, it is always guaranteed to close, even when things may go awry.

[11]:
with serial.Serial(port, baudrate=115200, timeout=1) as arduino:
    handshake_arduino(arduino)

    # And the rest of what you want to do follows....

“Hello, world,” a.k.a. turning on an LED

The “Hello, world” of electronic circuits is turning on an LED. To turn on the red LED, we need to send code 2 to the Arduino. It must be sent as bytes, machine numbers Arduino can understand. So, the signal we would send for code 2 is:

[12]:
bytes([2])
[12]:
b'\x02'

Let’s open up a port to Arduino and send a signal to turn on the red LED.

[13]:
with serial.Serial(port, baudrate=115200, timeout=1) as arduino:
    handshake_arduino(arduino)

    # Turn on the red LED
    arduino.write(bytes([RED_LED_ON]))

Now that we know how to turn an LED on (and off), we can make a little disco party!

[14]:
with serial.Serial(port, baudrate=115200, timeout=1) as arduino:
    handshake_arduino(arduino)

    # Flash the LEDs
    for _ in range(40):
        arduino.write(bytes([RED_LED_ON]))
        time.sleep(0.05)
        arduino.write(bytes([GREEN_LED_ON]))
        time.sleep(0.05)
        arduino.write(bytes([RED_LED_OFF]))
        time.sleep(0.05)
        arduino.write(bytes([GREEN_LED_OFF]))
        time.sleep(0.05)

Controlling Arduino with widgets

While calling a function to send and retrieve data from an external device (in this case our Arduino board) is useful, it is more convenient to enable control of the device using widgets.

To do so, we need to open the connection to the Arduino and leave it open (outside of context management). Since we will want to open and close these ports manually, including having all of the (un)pleasantries of connecting and sending and receiving the first few test packets of data, we will write a function to give us an open serial connection to the Arduino (knowing of course that we will need to close it).

[15]:
def open_arduino(port, baudrate=115200, timeout=1):
    """Open a connection with an Arduino device and
    handshake to get ready for use."""
    # Open port
    arduino = serial.Serial(port, baudrate=baudrate, timeout=timeout)

    # Close and reopen
    arduino.close()
    arduino.open()

    handshake_arduino(arduino)

    return arduino

Let’s go ahead and open the connection.

[16]:
arduino = open_arduino(port)

Widgets and callbacks

As a first step in setting up control of the device, we will make toggle buttons to turn the red and green LEDs on and off. We can make the buttons using pn.widgets.Toggle(), but instead we will directly use Bokeh widgets. We do this because connecting to streaming data, which we will do in the next lesson, is a bit easier using base Bokeh. Bokeh apps are also more configurable and can have better performance because of less overhead.

To build a Bokeh app to use in a Jupyter notebook, we need to write a function of with call signature app(doc). Within that function, we build the elements we want in the app, in this case just the toggle and its callback. Once those elements are defined, they need to be added to the doc using doc.add_root(). The code below accomplishes this.

[17]:
def LED_app(doc):
    """Make a toggle for turning LEDs on and off"""

    def red_callback(attr, old, new):
        if new:
            arduino.write(bytes([RED_LED_ON]))
        else:
            arduino.write(bytes([RED_LED_OFF]))

    def green_callback(attr, old, new):
        if new:
            arduino.write(bytes([GREEN_LED_ON]))
        else:
            arduino.write(bytes([GREEN_LED_OFF]))

    # Set up toggles
    red_LED_toggle = bokeh.models.Toggle(
        label="Red LED", button_type="danger", width=100,
    )
    green_LED_toggle = bokeh.models.Toggle(
        label="Green LED", button_type="success", width=100,
    )

    # Link callbacks
    red_LED_toggle.on_change("active", red_callback)
    green_LED_toggle.on_change("active", green_callback)

    # Lay out the toggles
    layout = bokeh.layouts.row(
        red_LED_toggle, bokeh.layouts.Spacer(width=15), green_LED_toggle
    )

    doc.add_root(layout)

Some comments:

  • We use the on_change() method of a toggle widget. The callback function for on-change behavior must have call signature callback(attr, old, new), where attr is an attribute of the widget, old is the pre-change value of that attribute, and new is the post-change value of the attribute.

  • We instantiate a toggle with bokeh.models.Toggle. Instead of name like we would use with Panel, we use the kwarg label.

  • A toggle has an active attribute, which is True when the toggle is on and False when off. Whenever that value changes, the callback is triggered.

  • We use button_type="danger" to give us a red button and button_type="success" to give us a green button.

To view the widget in the notebook, use bokeh.io.show(). Note that we called bokeh.io.output_notebook() earlier in this notebook, which means that the app will display in the notebook. We also defined the notebook_url, which can be found by looking in your browser’s address bar. In my case, the notebook_url is "localhost:8888". Note also that Bokeh apps will not be displayed in the static HTML rendering of this notebook, so if you are reading this from the course website, you will see no output from the cell below.

[18]:
bokeh.io.show(LED_app, notebook_url=notebook_url)

The LEDs on the Arduino can then be toggled using those two buttons.

As we shift gears to retrieving data from Arduino, we will keep the port open.

Retrieving data from Arduino

So far, we have used our serial connection to toggle LEDs on and off, but ultimately, we would like to receive data from the device. To do this, we can send data requests and then read what comes back.

Based on the above Arduino code, when we are in ON_REQUEST mode, Ardunio waits for a request for data to come through its serial connection to the computer. Upon receipt of a request, it sends data back. The time and voltage data come in as a byte string (Python data type bytes) containing the time and voltage separated by a comma. The byte string ends with both a carriage return (\r) and a newline (\n). To convert the byte string to numbers, we need to strip the carriage return and newline and decode the byte string into the Python strings we are used to using the decode() method. Finally, we can split the resulting string at the comma to get the voltage and time point for each.

Arduino values from analog inputs are 10 bit unsigned integers, with values ranging from 0 to 1023. So, when the voltage is written out, it will be an integer, which we need to convert to a voltage, knowing that the voltage on the Arduino Uno goes from zero to five volts. The time stamps are in milliseconds, transmitted as 32-bit unsigned integers. So, we should convert the voltage number we receive to Volts. The function below parses a byte string sent from Arduino.

[19]:
def parse_raw(raw):
    """Parse bytes output from Arduino."""
    raw = raw.decode()
    if raw[-1] != "\n":
        raise ValueError(
            "Input must end with newline, otherwise message is incomplete."
        )

    t, V = raw.rstrip().split(",")

    return int(t), int(V) * 5 / 1023

We can make a button that requests data from Arduino when pressed. We will link it to a function that sends a request and gets data, appending the results to lists. First, we’ll set up the data acquisition function. It must take a single argument (which we will ignore) to have the correct call signature to be used when linked to a Panel button.

[20]:
def request_single_voltage(arduino):
    """Ask Arduino for a single data point"""
    # Ask Arduino for data
    arduino.write(bytes([VOLTAGE_REQUEST]))

    # Read in the data
    raw = arduino.read_until()

    # Parse and return
    return parse_raw(raw)

We can call the function to get single time, voltage pairs.

[21]:
request_single_voltage(arduino)
[21]:
(9570, 1.9012707722385143)

If we like, we can connect this function to a button and grab voltages and time points to store in a list. To do so, we set up the lists and then append to them in a callback.

Note that a button is different from a toggle button. A toggle button stay pressed when pushed, and then stays unpressed when pushed again. This is like a power button. A button, on the other hand, just registers a click, like a mouse button. Therefore,for a button, we use an on_click() method instead of on_change() to link to the callback.

Again, Bokeh apps will only appear in a running notebook, so the button below will not appear in the static rendering of this notebook.

[22]:
time_ms = []
voltage = []

def daq_app(doc):
    """Button for acquiring time stamp and voltage and store in lists."""

    def daq_callback(event):
        t, V = request_single_voltage(arduino)
        time_ms.append(t)
        voltage.append(V)

    # Set up the button
    daq_button = bokeh.models.Button(label="Acquire", button_type="primary")
    daq_button.on_click(daq_callback)

    doc.add_root(daq_button)


bokeh.io.show(daq_app, notebook_url=notebook_url)

After hitting the button a few times (while turning the knob on the potentiometer), I got updated lists.

[23]:
print("time (ms)    voltage (V)")
for t, V in zip(time_ms, voltage):
    print(t, '      ', V)
time (ms)    voltage (V)
15876        1.9012707722385143
16411        2.0234604105571847
16954        2.5757575757575757
17379        1.8132942326490713
17931        1.3343108504398826
18499        2.482893450635386
19093        3.6070381231671553
19643        3.5826001955034212
20179        2.8299120234604107
20700        2.1896383186705766

Great! We have demonstrated that we can control an external device by sending signals and can retrieve data from the device as well.

In the next lesson, we will use the same setup to build apps to continuously retrieve data from Arduino while still controlling it. This requires asynchronous computing, and therefore a bit more sophistication.

Computing environment

[24]:
%load_ext watermark
%watermark -v -p serial,bokeh,jupyterlab
Python implementation: CPython
Python version       : 3.8.10
IPython version      : 7.22.0

serial    : 3.5
bokeh     : 2.3.2
jupyterlab: 3.0.14