Lesson 31. Apps for controlling external devices


[1]:
import re
import asyncio
import time

import numpy as np
import pandas as pd

import serial
import serial.tools.list_ports

import bokeh.plotting
import bokeh.io
import bokeh.driving

notebook_url = 'localhost:8888'
bokeh.io.output_notebook()
Loading BokehJS ...

In this lesson, you will learn how to use Python’s built-in asynchronous capabilities to constantly receive data from Arduino without blocking so that you can use the Python interpreter to do other tasks.

Setup

The setup for this lesson is the same as the previous one. As a reminder, you should have an Arduino Uno board with the setup below connected to a USB port on your computer.

Arduino

You should also have the following code uploaded onto your Arduino.

const int voltagePin = A0;

const int redLEDPin = 6;
const int yellowLEDPin = 2;

const int HANDSHAKE = 0;
const int VOLTAGE_REQUEST = 1;
const int RED_LED_ON = 2;
const int RED_LED_OFF = 3;
const int YELLOW_LED_ON = 4;
const int YELLOW_LED_OFF = 5;

const int ON_REQUEST = 6;
const int STREAM = 7;
const int READ_DAQ_DELAY = 8;

String daqDelayStr;

int inByte = 0;
int daqMode = ON_REQUEST;
int daqDelay = 100;   // delay between acquisitions in milliseconds

int value;
unsigned long time_ms;


void printVoltage() {
  // read value from analog pin
  value = analogRead(voltagePin);
  time_ms = millis();

  // Write the result
  if (Serial.availableForWrite()) {
    String outstr = String(String(time_ms, DEC) + "," + String(value, DEC));
    Serial.println(outstr);
  }
}


void setup() {
  // Set LEDs to off
  pinMode(redLEDPin, OUTPUT);
  pinMode(yellowLEDPin, OUTPUT);
  digitalWrite(redLEDPin, LOW);
  digitalWrite(yellowLEDPin, LOW);

  // initialize serial communication
  Serial.begin(115200);
}


void loop() {
  // If we're auto-transferring data (streaming mode)
  if (daqMode == STREAM) {
    printVoltage();
    delay(daqDelay);
  }

  // Check if data has been sent to Arduino and respond accordingly
  if (Serial.available() > 0) {
    // Read in request
    inByte = Serial.read();

    // Handshake
    if (inByte == HANDSHAKE){
      if (Serial.availableForWrite()) {
          Serial.println("Handshake message received.");
      }
    }

    // If data is requested, fetch it and write it
    else if (inByte == VOLTAGE_REQUEST) printVoltage();

    // Switch daqMode
    else if (inByte == ON_REQUEST) daqMode = ON_REQUEST;
    else if (inByte == STREAM) daqMode = STREAM;

    // Read in DAQ delay
    else if (inByte == READ_DAQ_DELAY) {
      while (Serial.available() == 0) ;
      daqDelayStr = Serial.readStringUntil('x');
      daqDelay = daqDelayStr.toInt();
    }

    // else, turn LEDs on or off
    else if (inByte == RED_LED_ON) digitalWrite(redLEDPin, HIGH);
    else if (inByte == RED_LED_OFF) digitalWrite(redLEDPin, LOW);
    else if (inByte == YELLOW_LED_ON) digitalWrite(yellowLEDPin, HIGH);
    else if (inByte == YELLOW_LED_OFF) digitalWrite(yellowLEDPin, LOW);
  }
}

The necessary Python code is in the code cell below.

[2]:
def find_arduino(port=None):
    """Get the name of the port that is connected to Arduino."""
    if port is None:
        ports = serial.tools.list_ports.comports()
        for p in ports:
            if p.manufacturer is not None and "Arduino" in p.manufacturer:
                port = p.device
    return port


def handshake_arduino(
    arduino, sleep_time=1, print_handshake_message=False, handshake_code=0
):
    """Make sure connection is established by sending
    and receiving bytes."""
    # Close and reopen
    arduino.close()
    arduino.open()

    # Chill out while everything gets set
    time.sleep(sleep_time)

    # Set a long timeout to complete handshake
    timeout = arduino.timeout
    arduino.timeout = 2

    # Read and discard everything that may be in the input buffer
    _ = arduino.read_all()

    # Send request to Arduino
    arduino.write(bytes([handshake_code]))

    # Read in what Arduino sent
    handshake_message = arduino.read_until()

    # Send and receive request again
    arduino.write(bytes([handshake_code]))
    handshake_message = arduino.read_until()

    # Print the handshake message, if desired
    if print_handshake_message:
        print("Handshake message: " + handshake_message.decode())

    # Reset the timeout
    arduino.timeout = timeout

With these in hand, we can proceed to step up capability for controlling an external device and streaming data from it.

Why do we need asynchrony?

When you communicated with a device, there are plenty of other tasks you want the Python interpreter to be doing while it is acquiring data. At the very least, you may want it to be listening for more user input to stop acquiring data. But you may also want to perform calculations on the incoming data (such as digital filtering), control and/or receive data from other connected devices, or even just mess around in your Jupyter notebook.

In order to do these things, you want the data acquisition to happen asynchronously. You want the interpreter to occasionally read and parse data, but be free to do whatever else you want it to do when it is not reading and parsing data. That is, you do not want to tie up the interpreter all the time waiting for data to come in.

Reading data in chunks

As a first step toward asynchrony, we will write a function to read data in chunks. Instead of constantly monitoring the data coming in over the serial connection, we would rather occasionally check the serial connection to see if there are any data in the input buffer. If there is, we read in whatever is in the input buffer to clear it, go off and process that, and then wait a while before checking again. During that waiting time, you can have the interpreter do other tasks.

Warning: Don’t wait too long to read, though! You do not want to overrun the USB input buffer size on your computer. Arduino’s output buffer is 64 bytes, and computers can have default input buffer sizes as low as 64 bytes as well. (I think most computers these days have input buffer sizes around 1024 bytes, but it does vary from machine to machine.)

Chunk reading for non-corrupted data

The function below reads in all of the data that is in the input buffer and returns the data as a byte string. We should specify a short timeout so that reading will stop before the buffer starts filling up again.

[3]:
def read_all(ser, read_buffer=b"", **args):
    """Read all available bytes from the serial port
    and append to the read buffer.

    Parameters
    ----------
    ser : serial.Serial() instance
        The device we are reading from.
    read_buffer : bytes, default b''
        Previous read buffer that is appended to.

    Returns
    -------
    output : bytes
        Bytes object that contains read_buffer + read.

    Notes
    -----
    .. `**args` appears, but is never used. This is for
       compatibility with `read_all_newlines()` as a
       drop-in replacement for this function.
    """
    # Set timeout to None to make sure we read all bytes
    previous_timeout = ser.timeout
    ser.timeout = None

    in_waiting = ser.in_waiting
    read = ser.read(size=in_waiting)

    # Reset to previous timeout
    ser.timeout = previous_timeout

    return read_buffer + read

For our present application, in which we read in comma-delimited time-voltage data, the byte string returned from this function might look like this:

b'1032,541\r\n1052,542\r\n1073,554\r\n1093,5'

Note that it does not end in a carriage return and newline. Those characters might not be in the read buffer yet, and since we are not using read_until(), we will not keep reading until we get those terminating characters. So, if we are parsing the output of this function, we should keep the last incomplete part of the data (in this case, b'1093,5' around for the next read.

Here is a parser that returns both the times and voltages as lists, as well as the remaining bytes that we will pass as the read_buffer kwarg in the read_all() function. There is some error checking. The only allowed characters are carriage returns, new lines, commas, and digits. Any message having other characters is discarded.

[4]:
def parse_read(read):
    """Parse a read with time, voltage data

    Parameters
    ----------
    read : byte string
        Byte string with comma delimited time/voltage
        measurements.

    Returns
    -------
    time_ms : list of ints
        Time points in milliseconds.
    voltage : list of floats
        Voltages in volts.
    remaining_bytes : byte string
        Remaining, unparsed bytes.
    """
    time_ms = []
    voltage = []

    # Separate independent time/voltage measurements
    pattern = re.compile(b"\d+|,")
    raw_list = [
        b"".join(pattern.findall(raw)).decode()
        for raw in read.split(b"\r\n")
    ]

    for raw in raw_list[:-1]:
        try:
            t, V = raw.split(",")
            time_ms.append(int(t))
            voltage.append(int(V) * 5 / 1023)
        except:
            pass

    if len(raw_list) == 0:
        return time_ms, voltage, b""
    else:
        return time_ms, voltage, raw_list[-1].encode()

Chunk reading with corrupted data

We discovered that on Windows machines, reads made with pySerial can sometimes result in corrupted bytes. This makes the read-in string unusable, and in many cases un-parsable because the resulting bytes do not correspond to any characters in ASCII. I am not sure exactly why this happens, but I suspect it is due to the read of a given byte being incomplete, with the read being interrupted before the stop bit. To counteract this, we can instead read chunks that must terminate in a newline using read_until(). This blocks all other processes until the complete newline byte is read. This also ensures that all bytes preceding the newline are read in their entirety as well.

[5]:
def read_all_newlines(ser, read_buffer=b"", n_reads=4):
    """Read data in until encountering newlines.

    Parameters
    ----------
    ser : serial.Serial() instance
        The device we are reading from.
    n_reads : int
        The number of reads up to newlines
    read_buffer : bytes, default b''
        Previous read buffer that is appended to.

    Returns
    -------
    output : bytes
        Bytes object that contains read_buffer + read.

    Notes
    -----
    .. This is a drop-in replacement for read_all().
    """
    raw = read_buffer
    for _ in range(n_reads):
        raw += ser.read_until()

    return raw

asyncio

Python has handy built-in asynchronous capabilities using the asyncio module from the standard library. It was introduced recently, in Python 3.5, and has had changes and deprecations since. The version in Python 3.8 has nice high-level functionality and has a stable API, so it is important that you are using Python 3.8.

I will give a brief overview of how it works here, but you would be well-served to read the documentation, most importantly the coroutines and tasks section.

At the center of asyncio’s high-level functionality are awaitables. An awaitable is a process that the interpreter can suspend such that it is not blocking the interpreter from doing other tasks. A very important awaitable is asyncio.sleep(), which is one we will put to use.

Aside from sleeping, the awaitables we will use are coroutines and tasks. You can think of a coroutine as a function that you can start and stop and start again. A task runs a coroutine. As usual, this is best seen by example.

We will start by making a coroutine that is a greeting in English. It says “hello” and then waits one second to say “world.” It returns a string describing what the message was. We would do this in a synchronous way (so it is a function and not a coroutine) like this:

[6]:
def english(exclaim=False):
    print("Hello, ")
    time.sleep(1)
    print("world" + ("!" if exclaim else "."))

    return "The message was a greeting to the world."

We can run this function, and it works as expected.

[7]:
message = english(exclaim=True)
Hello,
world!

The problem is that the function, like all functions in Python, blocked. While waiting for a second to see “world,” the Python interpreter was busy (it was busy sleeping!). When a function of piece of code is running and prevents the interpreter from doing anything else, it is said to be blocking.

Now, let’s write an asynchronous version, that is a coroutine.

[8]:
async def english_async(exclaim=False):
    print("Hello, ")
    await asyncio.sleep(1)
    print("world" + ("!" if exclaim else "."))

    return "The message was a greeting to the world."

The async def keyword signifies that this is not a function, but a coroutine. That means the interpreter can start running the coroutine, leave it and do something else, and then run it again. It can only leave the coroutine where an awaitable is run. To run an awaitable within a coroutine, we use the await keyword. So, when we run await asyncio.sleep(1), the Python interpreter turns its attention away from the english_async() coroutine until asyncio.sleep() returns, which will happen after one second.

We cannot just run a coroutine like it is a function. Look:

[9]:
english_async(exclaim=True)
[9]:
<coroutine object english_async at 0x7fcb7e5d2f40>

We get back a coroutine. To run it, we need a running event loop. An event loop enables asynchronous computing by listening for requests to do something, and then dispatching resources to do the requested calculation. Each thread (which you can think of for our purposes as one core of your CPU) can have either zero or one event loops. If you are running a Jupyter notebook, there is an active event loop; that is how JupyterLab runs, waiting for you to execute a cell. If you are not in a Jupyter notebook, you probably do not have an event loop running, so you need to start one. We will discuss how to start and run an event loop outside of JupyterLab later in this lesson. For now, we will assume you have a running event loop, as you do in a Jupyter notebook.

To run the coroutine, you can create a task using asyncio.create_task(). Note that “calling” a coroutine like a function returns a coroutine, so the argument you pass into asyncio.create_task() is how you would call the coroutine, including all arguments and keyword arguments. Upon creation, the coroutine is run.

[10]:
task_english = asyncio.create_task(english_async(exclaim=True))
Hello,
world!

You can access the return value of the coroutine using the result() method of the task. Of course, you should first check the done() method of the task to see if it has completed.

[11]:
task_english.done()
[11]:
True

And we can safely retrieve the result.

[12]:
task_english.result()
[12]:
'The message was a greeting to the world.'

Now, let’s make another coroutine that says the same greeting in Spanish. For demonstration purposes, this function will only wait a half second between the two words.

[13]:
async def spanish_async(exclaim=False):
    print(("  ¡" if exclaim else "  ") + "Hola, ")
    await asyncio.sleep(0.5)
    print("  mundo" + ("!" if exclaim else "."))

    return("El mensaje fue un saludo al mundo.")

We can run this coroutine as we did for the English one.

[14]:
task_spanish = asyncio.create_task(spanish_async(exclaim=True))
  ¡Hola,
  mundo!

With asynchronous computing, we can run the two coroutines concurrently! There are several ways to do this. First, we can create tasks one after another. The first task is created, “Hello,” is printed, and then the second task is created. (This time, we won’t exclaim.)

[15]:
task_english = asyncio.create_task(english_async())
task_spanish = asyncio.create_task(spanish_async())
Hello,
  Hola,
  mundo.
world.

Because the delay is shorter for the Spanish version, the entire message gets printed before the English message is complete.

As another option, we can gather the coroutines together using asyncio.gather().

[16]:
task_english_spanish = asyncio.gather(english_async(), spanish_async())
Hello,
  Hola,
  mundo.
world.

To get the return values, we gain use task_english_spanish.result().

[17]:
task_english_spanish.result()
[17]:
['The message was a greeting to the world.',
 'El mensaje fue un saludo al mundo.']

Note that the result is the return values from the two coroutines as a list.

Canceling a task

Once a task is created, it may be interrupted and canceled using the cancel() method of the task. For example, we can cancel the English greeting before the second word comes out.

[18]:
# Create the task
task_english = asyncio.create_task(english_async())

# Wait a half second
await asyncio.sleep(0.5)

# Cancel the task
successfully_canceled = task_english.cancel()
Hello,

Note that the cancel() method requests a cancellation, but cancellation is not guaranteed. You should read the asyncio documentation for more information.

A canceled job will both be marked as done and canceled.

[19]:
task_english.done(), task_english.cancelled()
[19]:
(True, True)

Since it was not allowed to return, though, the result will be a CancelledError.

[20]:
task_english.result()
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Input In [8], in english_async(exclaim)
      2 print("Hello, ")
----> 3 await asyncio.sleep(1)
      4 print("world" + ("!" if exclaim else "."))

File ~/opt/anaconda3/lib/python3.9/asyncio/tasks.py:652, in sleep(delay, result, loop)
    651 try:
--> 652     return await future
    653 finally:

CancelledError:

During handling of the above exception, another exception occurred:

CancelledError                            Traceback (most recent call last)
Input In [20], in <cell line: 1>()
----> 1 task_english.result()

CancelledError:

Running without an existing event loop

If you do not have a running event loop on your thread, which will typically be the case if you are running outside of JupyterLab, you need to start an event loop. Fortunately, asynchio provides a convenient way to start (and automatically terminate upon completion of all coroutines) with its asyncio.run() function. To use it, define a coroutine that awaits all of the tasks you want to run and then pass that coroutine as an argument to asyncio.run(). For example, to run the English and Spanish greetings concurrently, do the following.

async def main():
    gathered = asyncio.gather(english_async(), spanish_async())
    await gathered

    return gathered.result()


asyncio.run(main())

Receiving data asynchronously

Now that we understand how asynchrony works in Python, let’s receive some data! We’ll of course start by shaking hands with Arduino.

[21]:
HANDSHAKE = 0
VOLTAGE_REQUEST = 1
RED_LED_ON = 2
RED_LED_OFF = 3
YELLOW_LED_ON = 4
YELLOW_LED_OFF = 5
ON_REQUEST = 6
STREAM = 7
READ_DAQ_DELAY = 8

# Windows users may need to give COM port for find_arduino()
port = find_arduino()

# Connect and handshake
arduino = serial.Serial(port, baudrate=115200)
handshake_arduino(arduino, print_handshake_message=True)
Handshake message: Handshake message received.

Now we can write a coroutine to acquire data. A few comments on how it works.

  1. We will read the data in chunks using the functions we wrote at the beginning of this lesson.

  2. I read in the first few messages sent from Arduino after turning on the stream and discard them, just to ensure the input buffer of my computer is cleared and we’re getting good clean reads.

  3. We sleep between acquisitions. I choose to sleep about 80% of the time of the acquisitions. This ensures that I will never have too many bytes in the input buffer, but I am still not checking as often as I could be. (Note that the read_all_newlines() function will take longer to run than the read_all() function because it has to wait until Arduino sends its final newline. It is blocking while it is waiting. This should not be a major slowdown, though.)

  4. The function takes an input reader, which specifies which function we want to use to read in the serial data. By default, we use read_all_newlines() because it does not have the aforementioned issues on Windows.

  5. I set up a dictionary to hold the data that gets updated with data as it is read.

[22]:
# Set up data dictionary
data = dict(time_ms=[], voltage=[])


async def daq_stream_async(
    arduino,
    data,
    n_data=100,
    delay=20,
    n_trash_reads=5,
    n_reads_per_chunk=4,
    reader=read_all_newlines,
):
    """Obtain `n_data` data points from an Arduino stream
    with a delay of `delay` milliseconds between each."""
    # Specify delay
    arduino.write(bytes([READ_DAQ_DELAY]) + (str(delay) + "x").encode())

    # Turn on the stream
    arduino.write(bytes([STREAM]))

    # Read and throw out first few reads
    i = 0
    while i < n_trash_reads:
        _ = arduino.read_until()
        i += 1

    # Receive data
    read_buffer = [b""]
    while len(data["time_ms"]) < n_data:
        # Read in chunk of data
        raw = reader(arduino, read_buffer=read_buffer[0], n_reads=n_reads_per_chunk)

        # Parse it, passing if it is gibberish
        try:
            t, V, read_buffer[0] = parse_read(raw)

            # Update data dictionary
            data["time_ms"] += t
            data["voltage"] += V
        except:
            pass

        # Sleep 80% of the time before we need to start reading chunks
        await asyncio.sleep(0.8 * n_reads_per_chunk * delay / 1000)

    # Turn off the stream
    arduino.write(bytes([ON_REQUEST]))

    return pd.DataFrame(
        {"time (ms)": data["time_ms"][:n_data], "voltage (V)": data["voltage"][:n_data]}
    )

To acquire data using this coroutine, we create a task.

[23]:
daq_task = asyncio.create_task(daq_stream_async(arduino, data, n_data=1000, delay=20))

We can retrieve the data frame from the task’s result and make a plot.

[24]:
# Get the data from from the result
df = daq_task.result()

# Convert milliseconds to seconds
df['time (sec)'] = df['time (ms)'] / 1000

# Plot!
p = bokeh.plotting.figure(
    x_axis_label='time (s)',
    y_axis_label='voltage (V)',
    frame_height=175,
    frame_width=500,
    x_range=[df['time (sec)'].min(), df['time (sec)'].max()],
)
p.line(source=df, x='time (sec)', y='voltage (V)')

bokeh.io.show(p)

App with live updates

While the above approach is useful, it leaves a bit to be desired, since we are not updating the plot in real time. Only after data acquisition is finished can we make a plot. We would rather have the plot update in real time, together with a button to start and stop data acquisition. In order to do that, we need to connect a Bokeh plot to a periodic callback, which constantly checks the dictionary of acquired data and updates the plot accordingly.

As an example of how a Bokeh app can use a periodic callback for streaming, I build one below to dynamically plot a random walk. The walk will proceed with a dot doing the walk and the trail behind it represented as a line. To build a Bokeh app, we need to write a function the controls the app. The function for the random walker is shown below with an explanation following immediately.

[25]:
def random_walk(doc):
    """Bokeh app for a dynamic random walk of 1000 steps."""
    rg = np.random.default_rng(3252)

    p = bokeh.plotting.figure(
        frame_width=200,
        frame_height=200,
        x_range=[-20, 20],
        y_range=[-20, 20],
    )

    # Use ColumnDataSources for data for populating glyphs
    source_line = bokeh.models.ColumnDataSource({"x": [0], "y": [0]})
    source_dot = bokeh.models.ColumnDataSource({"x": [0], "y": [0]})
    line = p.line(source=source_line, x="x", y="y")
    dot = p.circle(source=source_dot, x="x", y="y", color="tomato", size=7)

    @bokeh.driving.linear()
    def update(step):
        if step > 1000:
            doc.remove_periodic_callback(pc)
        else:
            theta = rg.uniform(0, 2 * np.pi)
            new_position = {
                "x": [source_dot.data["x"][0] + np.cos(theta)],
                "y": [source_dot.data["y"][0] + np.sin(theta)],
            }
            source_line.stream(new_position)
            source_dot.data = new_position

    doc.add_root(p)

    # Add a periodic callback to be run every 20 milliseconds
    pc = doc.add_periodic_callback(update, 20)

The argument of the function (traditionally called doc) is accessed to add any plots (or other Bokeh) to app and to add the callbacks. We first set up the figure and set up the data sources for the dot and for the line. Next, we set up the data sources using Bokeh’s ColumnDataSource. So far in the bootcamp, we have use Pandas data frames as the source for plots. Under the hood, Bokeh automatically converts those to its ColumnDataSource data type. This data type may be dynamically updated in a Bokeh app, which is exactly what we want. After the data sources are set up, we set up an update function (we call it update() here, but it could have any name). This is what will be called each time the periodic callback triggers. In this case, we decorate the function with @bokeh.driving.linear(). This results in the argument of update, step, being advanced by one every time the function is called. This way we can keep track of how many steps were taken. In the update function, if we have exceeded the number of desired steps, we cancel the periodic callbacks. Otherwise, we compute the next step of the random walk by computing a random angle for the step. We update the position of the walker by adding the step to it. Finally, we update the data sources for the dot and line. For the line, we use the stream() method. This results in Bokeh only appending new data to the data source instead of pushing through the whole data set for the plot each time. For the dot, since it is only plotted as a single position, we update the source data to be the dot position.

To run our app, we use bokeh.io.show(). We should also include the URL of the notebook (specified above in the input cell; you can see the URL by looking at the top of your browser).

[26]:
bokeh.io.show(random_walk, notebook_url=notebook_url)

We can use this technique to build an app for acquiring voltages coming out of the potentiometer. Our strategy for building our app is this:

  • Set up a dictionary containing lists of data

  • Asynchronously collect data from Arduino that updates the data dictionary

  • Set up a periodic callback so Bokeh updates the plot from the data dictionary

To do this, we need to keep track of which data are included on the plot and which are new. Therefore, the data dictionary also contains a variable to remember how long the time point and voltage lists were the last time the plot was rendered.

[27]:
# Set up data dictionary
data = dict(prev_array_length=0, time_ms=[], voltage=[])

Next, we build the plotting app. Because the app must have a call signature app(doc), I like to write a function that returns an app. This allows me to have a more convenient API for specifying properties of the app. This app is essentially like the random walk app, except that we pull data out of the data dictionary as needed. We also have a rollover parameter, which specifies the maximum number of data points to be displayed on the plot. Only the most recent data points are displayed. For time series data, like we’re plotting here, this results in a “scroll” across the plot, kind of like a stock ticker.

I have also included a keyword argument for the delay between plot updates. If the delay is too short, your computer will struggle trying to render the Bokeh plot at a high rate. In my experience, plots that are updated every 100 ms or less look like essentially continuous updates to the eye, so I use a plot delay of 90 ms.

[28]:
def potentiometer_app(data, n_data=100, rollover=400, plot_update_delay=90):
    """Return a function defining a Bokeh app for streaming
    data up to `n_data` data points. A maximum of `rollover`
    data points are shown at a time.
    """
    def _app(doc):
        # Instatiate figures
        p = bokeh.plotting.figure(
            frame_width=500,
            frame_height=175,
            x_axis_label="time (s)",
            y_axis_label="voltage (V)",
            y_range=[-0.2, 5.2],
        )

        # No padding on x_range makes data flush with end of plot
        p.x_range.range_padding = 0

        # Start with an empty column data source with time and voltage
        source = bokeh.models.ColumnDataSource({"t": [], "V": []})

        # Put a line glyph
        r = p.line(source=source, x="t", y="V")

        @bokeh.driving.linear()
        def update(step):
            # Shut off periodic callback if we have plotted all of the data
            if step > n_data:
                doc.remove_periodic_callback(pc)
            else:
                # Update plot by streaming in data
                source.stream(
                    {
                        "t": np.array(data['time_ms'][data['prev_array_length']:]) / 1000,
                        "V": data['voltage'][data['prev_array_length']:],
                    },
                    rollover,
                )
                data['prev_array_length'] = len(data['time_ms'])

        doc.add_root(p)
        pc = doc.add_periodic_callback(update, plot_update_delay)


    return _app

Now, to put the app to use! We need to show the app, and then create a task to acquire the data. The plot is then updated live! (Note that this is not viewable in the static HTML version of this lesson.)

[29]:
n_data = 1000

bokeh.io.show(potentiometer_app(data, n_data=n_data), notebook_url=notebook_url)
daq_task = asyncio.create_task(daq_stream_async(arduino, data, n_data=n_data, delay=20))

We can now watch data acquisition in real time! In the next lesson, we will go a few steps further: We will make an app with controls for starting, stopping, and saving acquisition that can stand alone on its own page in a browser (outside of a Jupyter notebook).

Before we end this lesson, we have to do the all-important closing of the serial connection!

[30]:
arduino.close()

Computing environment

[31]:
%load_ext watermark
%watermark -v -p numpy,pandas,serial,bokeh,jupyterlab
Python implementation: CPython
Python version       : 3.9.12
IPython version      : 8.3.0

numpy     : 1.21.5
pandas    : 1.4.2
serial    : 3.5
bokeh     : 2.4.2
jupyterlab: 3.3.2