How can I save data in a dosing automation

I’m playing around with your default turbidostat automation and I want to calculate its growth rate over time.

I calculate my growth every time a dilution event occurs. My growth rate is calculated via ( end_normalized_od - start_normalized_od ) / ( end_time - start_time )

  1. What function can I use to check the current time time? I want to know the seconds between every dilution event.
  • (Edit) I am going to try and import perf_counter() from time and use this to get my timestamps.

Once I’ve calculated my growth rate for a given interval and a dilution event occurs, I want to save this data in a database somewhere, and maybe plot it on a chart in the ui too. What would this look like?

Yea, perf_counter() works great to measure duration.


(This is my third time writing this, thinking about how to solve this in the most-straightforward way).

You can put arbitrary data into the Event that is returned from the execute function. For example, something like the below in your own custom dosing automation could work:

   class MyCustomAutomation(DosingAutomationJob):
      ...

      def execute(self):
           # TODO: define variables previous_normalized_od and previous_time
           # define turbidostat operation
           ....
           delta_time = time.perf_counter() - self.previous_time
           my_growth_rate = (self.latest_normalized_od - self.previous_normalized_od) / delta_time
           return events.DilutionEvent("I diluted!", {"my_growth_rate": my_growth_rate})

The data is then logged to the database table dosing_automation_events. To access it, you can own up the SQL console to the database: pio db, and run the following:

SELECT * FROM dosing_automation_events 
WHERE experiment = "x"

where x is your experiment name. You should see a data column with your data. Next part is extracting that and putting it into your UI, but that’s not too hard honestly. We can do that next if you proceed in this direaction.


More docs on Events: Automation events | Pioreactor Docs

Thanks. Here’s my code:

    def execute(self) -> events.DilutionEvent:
        media_ml = 0
        alt_media_ml = 0
        waste_ml = 0

        results = self.run_dilution()
        self.update_run_settings()
        self.collect_post_run_data()

        if results:
            if self.first_run is True:
                self.first_run = False
                return events.DilutionEvent(
                    f"Latest Normalized OD = {latest_normalized_od_before_dosing:.2f} >= Target  nOD = {target_normalized_od_before_dosing:.2f}; cycled {results:2f} mL"
                    )
            if self.first_run is False:
                growth_per_interval = self.calculate_data()
                return events.DilutionEvent(
                    f"Latest Normalized OD = {latest_normalized_od_before_dosing:.2f} >= Target  nOD = {target_normalized_od_before_dosing:.2f}; cycled {results:2f} mL",
                    data = {
                        "growth_per_interval": growth_per_interval,
                        },
                    )

I’m calculating a growth_per_interval using a delta_norm_od / delta_time formula. I want to see a graph of how my growth_per_interval changes over time in my UI. I think the dilution event somehow passes data back to the database to store it somewhere, but I don’t know how this works. I think it’s a SQL database, but I am not really sure what this means other than “popular database” and "pronounced as ‘sequel’ ". How can I log an existing data type to this database? How can I log a new data type to this database that doesn’t currently exist?

How can I then create a graph of how this data changes over time and view it in the UI?

Also, I can see that data is being passed through the events in the form:

data={
    "string_A": string_B,
},

What does string_A do and what does string_B do?

I haven’t actually tried running this code because I’m unsure how the database works and don’t want to mess something up.

I think the dilution event somehow passes data back to the database to store it somewhere, but I don’t know how this works.

This is taken care of in another job you may have seen, mqtt_to_db_streaming. This job looks for events, like the DilutionEvent, and stores it into the database.

I think it’s a SQL database, but I am not really sure what this means other than “popular database” and "pronounced as ‘sequel’ "

Spot on! mqtt_to_db_streaming job will put data into a SQL database (specifically, SQLite). SQL databases are made up of tables of data, and it’s pretty easy to add new tables to store new data.

How can I log an existing data type to this database? How can I log a new data type to this database that doesn’t currently exist?

It’s a bit involved. My goal in my above response is to minimize how much we need to do to get a working graph in the UI. I’ll explain as we go.

What does string_A do and what does string_B do?

In this case, data is just a Python dictionary, so we can put whatever (key, value) pairs we like in it. "string_A" is the key, and string_B is the value (can be a number, string, or anything really). The job mqtt_to_db_streaming will take this dictionary and put it into the database for us. Specifically, it will put it into the table dosing_automation_events, like so:

Later, we will extract what we need from the data column to display in the UI.

I haven’t actually tried running this code because I’m unsure how the database works and don’t want to mess something up.

You will be safe to run it. Its not really possible to mess up the database from this code.

Ok, I am storing my data in the format:
data = { “growth_per_interval”: growth_per_interval, }

and storing it through events.DilutionEvent.

Here is my current .yaml. The brackets indicate where I think I should be making a change, but am not quite sure what to put in there.

---
data_source: [od_readings_filtered] # SQL table
chart_key: growth_per_interval
title: Growth Rate per Interval
mqtt_topic: [growth_rate_calculating/od_filtered]
source: app
payload_key: [od_filtered]
y_axis_label: Growth Rate / Interval Time
interpolation: stepAfter
y_axis_domain: [-0.02, 0.1]
lookback: 24
fixed_decimals: 4

Okay, next, we need specify (in yaml) what database table it should look at. The data is in a format right now that’s not accessible (see my screen shot above), so we are going to create a VIEW on top of this table that looks better.

But first, here’s an example. I open up pio db, and enter the following:

SELECT timestamp, experiment, event_name, pioreactor_unit, data 
FROM dosing_automation_events;

I get something like this (your data will look slightly different, but squint you should see the similarity) . The data column is a JSON representation of the dictionary in Python we talked about above (JSON is just a serialization format):

timestamp experiment event_name pioreactor_unit data
2022-12-09T21:28:32.465374Z demoo NoEvent leader6
2022-12-09T21:28:58.982423Z demoo NoEvent leader6
2023-01-04T20:32:32.891195Z pwm tests NoEvent worker1
2023-01-04T20:35:32.485015Z pwm tests NoEvent worker1
2023-01-04T20:40:23.109093Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:41:05.931800Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:43:05.824067Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:45:05.694384Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:47:05.945823Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:49:05.622566Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:51:05.792569Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:53:05.851546Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:55:05.709735Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:57:05.614472Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:59:06.747484Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:01:05.720854Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:03:05.808914Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:05:05.519326Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}

Let’s apply a WHERE filter to filter out the empty data rows:

SELECT timestamp, experiment, event_name, pioreactor_unit, data 
FROM dosing_automation_events
WHERE data != "";
timestamp experiment event_name pioreactor_unit data
2023-01-04T20:40:23.109093Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:41:05.931800Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:43:05.824067Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:45:05.694384Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:47:05.945823Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:49:05.622566Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:51:05.792569Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:53:05.851546Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:55:05.709735Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:57:05.614472Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T20:59:06.747484Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:01:05.720854Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:03:05.808914Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:05:05.519326Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:07:05.883444Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}
2023-01-04T21:09:05.822024Z pwm tests DilutionEvent worker1 {“volume_actually_cycled”: 0.5}

Okay, great, but in order for the UI to display the data, we need a single table column of numbers, not this JSON data. We can use the SQL function json_extract to extract the field we want:

  SELECT 
    timestamp, 
    experiment, 
    pioreactor_unit, 
    event_name, 
    data, 
    json_extract(data, '$.volume_actually_cycled') as volume_cycled 
  FROM dosing_automation_events 
  WHERE data != "";
timestamp experiment pioreactor_unit event_name data volume_cycled
2023-01-04T20:40:23.109093Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:41:05.931800Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:43:05.824067Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:45:05.694384Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:47:05.945823Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:49:05.622566Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:51:05.792569Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:53:05.851546Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:55:05.709735Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:57:05.614472Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T20:59:06.747484Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5
2023-01-04T21:01:05.720854Z pwm tests worker1 DilutionEvent {“volume_actually_cycled”: 0.5} 0.5

Great! This looks like how we want it. But this isn’t a table, it’s just a result from a query - we need to tell our database to create a permanent VIEW of this query. Let’s call this new VIEW growth_per_intervals.

CREATE VIEW growth_per_intervals AS 
  SELECT 
    timestamp, 
    experiment, 
    pioreactor_unit, 
    event_name, 
    data, 
    json_extract(data, '$.volume_actually_cycled') as volume_cycled 
  FROM dosing_automation_events 
  WHERE data != "";

Now, I can query this just like a table:

SELECT * FROM growth_per_intervals

Okay, now the yaml file.

  • We have the data_source now, it’s growth_per_intervals.
  • We have the column_name, it’s volume_cycled
  • mqtt_topic we can’t do, just yet, maybe later.
  • payload_key is for MQTT, so we’ll skip that for now.

And I think that’s it. Try this out, and let me know if you are stuck somewhere or need more clarification!


There are other ways to get data into SQL in Pioreactor, but this is still the easiest for your context.

Thanks for the help.

I believe I got the query working, and I’ll update the .yaml file now. For some reason, I’m now getting an error when I try to run some of my custom automations that I haven’t seen before.

  • 10:50:15 pioreactor1 dosing automation ‘AltMediaTurbidostat’ object has no attribute ‘execute_io_actions’

I don’t have this issue when I try running the default Turbidostat, and I was running a slightly less modified version of this turbidostat a couple days ago without issue. I’m not sure why it is giving me that error now.

# -*- coding: utf-8 -*-

from __future__ import annotations

from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJobContrib
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistant_storage
from time import perf_counter

__plugin_summary__ = "An extension of the turbidostat automation to allow a second media source."
__plugin_version__ = "0.0.1"
__plugin_name__ = "Alt Media Turbidostat Automation"
__plugin_author__ = "realPeteDavidson"
__plugin_homepage__ = "https://docs.pioreactor.com"

class AltTrackingTurbidostat(DosingAutomationJobContrib):
    """
    Alternate Turbidostat mode - Try to keep cell density constant by dosing whenever the target_normalized_od is hit. Over time, the amount of alternate media per dose increases.
    """

    automation_name = "alt_tracking_turbidostat"
    published_settings = {
            "media_volume": {"datatype": "float", "settable": True, "unit": "mL"},
            "alt_media_volume": {"datatype": "float", "settable": True, "unit": "mL"},
            "alt_media_interval": {"datatype": "float", "settable": True, "unit": "min"},
            "alt_media_dose_increase": {"datatype": "float", "settable": True, "unit": "mL"},
            "duration": {"datatype": "float", "settable": True, "unit": "min"},
            "target_normalized_od": {"datatype": "float", "settable": True, "unit": "AU"},
            }

    def __init__(self, media_volume: float | str, alt_media_volume: float | str, alt_media_interval: float | str, alt_media_dose_increase: float | str, target_normalized_od: float | str, **kwargs) -> None:
        super().__init__(**kwargs)

        with local_persistant_storage("current_pump_calibration") as cache:
            if "media" not in cache:
                raise CalibrationError("Media pump calibration must be performed first.")
            elif "waste" not in cache:
                raise CalibrationError("Waste pump calibration must be performed first.")
            elif "alt_media" not in cache:
                raise CalibrationError("Alt_Media pump calibration must be performed first.")

        self.media = float(media_volume)
        self.alt_media = float(alt_media_volume)
        self.alt_media_interval = float(alt_media_interval)
        self.alt_media_dose_increase = float(alt_media_dose_increase)
        self.target_normalized_od = float(target_normalized_od)
        self.alt_media_counter = 0
        self.just_ran = False
        self.first_run = True
        self.run_start_time = 0
        self.run_start_nod = 1
        self.run_end_time = 1
        self.run_end_nod = 2

    def run_dilution(self):
        if self.latest_normalized_od >= self.target_normalized_od:
            self.latest_normalized_od_before_dosing = self.latest_normalized_od
            self.target_normalized_od_before_dosing = self.target_normalized_od
            results = self.execute_io_action(media_ml = self.media, alt_media_ml = self.alt_media, waste_ml = self.media + self.alt_media)
            self.just_ran = True
            return results
        else:
            return 0



    def update_run_settings(self):
        self.alt_media_counter += self.duration
        if self.alt_media_counter >= self.alt_media_interval and self.latest_normalized_od >= self.target_normalized_od:
            self.alt_media += self.salt_media_dose_increase
            self.alt_media_counter = 0

    def collect_post_run_data(self):
        if self.just_ran == True and self.latest_normalized_od < self.latest_normalized_od_before_dosing:
            if self.latest_normalized_od < self.run_start_nod:
                self.run_start_time = perf_counter()
                self.run_start_nod = self.latest_normalized_od
                self.just_ran = False
        if self.latest_normalized_od < self.run_start_nod:
            self.run_start_time = perf_counter()
            self.run_start_nod = self.latest_normalized_od

    def calculate_data(self):
        if self.run_start_time is not 0:
            growth_per_interval = (self.run_end_nod - self.run_start_nod)/(self.run_end_time - self.run_start_time)
            return growth_per_interval
        if self.run_start_time is 0:
            return 0

    def execute(self) -> events.DilutionEvent:
        media_ml = 0
        alt_media_ml = 0
        waste_ml = 0

        results = self.run_dilution()
        self.update_run_settings()
        self.collect_post_run_data()

        if results:
            if self.first_run is True:
                self.first_run = False
                return events.DilutionEvent(
                    f"Latest Normalized OD = {latest_normalized_od_before_dosing:.2f} >= Target  nOD = {target_normalized_od_before_dosing:.2f}; cycled {results:2f} mL"
                    )
            if self.first_run is False:
                growth_per_interval = self.calculate_data()
                return events.DilutionEvent(
                    f"Latest Normalized OD = {latest_normalized_od_before_dosing:.2f} >= Target  nOD = {target_normalized_od_before_dosing:.2f}; cycled {results:2f} mL",
                    data = {
                        "growth_per_interval": growth_per_interval,
                        },
                    )

Ah, it’s a typo: execute_io_actionsexecute_io_action

1 Like

I had some coffee, just sat back down to look at the code, and just saw that was the issue. Not sure when this error crept into the code. :confused:

No need to explain to me - I’ve lost hours looking for an errant typo!

Ok. So this is my .yaml file:

---
data_source: growth_per_intervals # SQL table
chart_key: growth_per_intervals
title: Growth Rate per Interval
mqtt_topic: growth_rate_calculating/od_filtered
source: app
payload_key: od_filtered
y_axis_label: Growth Rate / Interval
interpolation: stepAfter
y_axis_domain: [-0.02, 0.1]
lookback: 6
fixed_decimals: 4

I went into the config and added growth_per_intervals=1, saved the config, and refreshed the web page. The chart is not showing up yet however. I checked pioreactor.log and pioreactorui.log, but don’t see any errors relating to this (I am still seeing the previous issue about the malformed database though, haven’t yet restarted the unit).

You will also need a data_source_column attribute in the yaml that specifys what column in the table to display. In my example, it would be given the value volume_cycled.

However, that sync-config issue is a blocker here, too. Let’s fix that (I don’t think restarting would work):

sudo rm /var/www/pioreactorui/{huey.db,huey.db-wal,huey.db-shm}
touch /var/www/pioreactorui/{huey.db,huey.db-wal,huey.db-shm}
sudo chmod 770 /var/www/pioreactorui/{huey.db,huey.db-wal,huey.db-shm}
sudo systemctl restart huey.service
sudo systemctl restart lighttpd.service

There’s a potential bug fix for what happened here in the next release too, so I don’t expect this to happen again.

I just ran the code and it looks like it fixed the issue. I tried saving the the config through the UI and it gave me a message saying it worked.

1 Like

Ok, I just tried recreating the database view as follows:

CREATE VIEW graph_gpi AS 
SELECT timestamp, experiment, pioreactor_unit, event_name, data, json_extract(data, '$.growth_per_interval') as growth_per_interval
FROM dosing_automation_events
WHERE data != "";

My turbidostat automation is storing the calculated data in data = {“growth_per_interval”: growth_per_interval} and sending this through events.DilutionEvent(“string”, data={…})

I’ve been trying to bug fix the code, so I don’t know if I have actually saved any actual data I want yet.

So in this new view, my data_source will be graph_gpi, and my column_name will be growth_per_interval, correct?

I’d like to know a little bit more about what json_extract is doing. What does the $ operator do, and is this creating an additional column?

For example: json_extract( data, “$.variable”) as “NEW_COLUMN_NAME”

Is this extracting the data from column(“data”) and creating a new Column(“NEW_COLUMN_NAME”)? What function does the ‘$’ serve?

Also, another side question: When is a plugin’s code “refreshed”? Let’s say I notice an error, go into the code, fix it, and then save the update. Do I need to quit my dosing automation and restart it, or will the updated code automatically be used next time it runs execute()? After typing this, I’m pretty sure that the original (buggy) code would be tied to the object during instantiation, so the answer is probably “you need to exit the dosing automation and restart it.”

So in this new view, my data_source will be graph_gpi, and my column_name will be growth_per_interval, correct?

Yea, that looks right.

Is this extracting the data from column(“data”) and creating a new Column(“NEW_COLUMN_NAME”)? What function does the ‘$’ serve?

That’s exactly it. $ is like a placeholder, a dummy variable, that represents the parsed-string. If you had more complex data, like:

{'a': 1, 'b': [0, 2, 4]}
{'a': 2, 'b': [4, 6, 8]}
{'a': 3, 'b': [8, 10, 12]}

Then json_extract( data, “$.a”) would give:

1
2
3

where as json_extract( data, “$.b[2]”) would give:

4
8
12

you need to exit the dosing automation and restart it.

Yea, that’s right. Might be faster to iterate on the command line:

pio run dosing_control --automation-name alt_tracking_turbidostat --media_volume 1.0 --alt_media_volume 1.0 --alt_media_interval 10 --alt_media_dose_increase 1.0 --target_normalized_od 1.5

and ctrl-c to cancel. Whatever works for you.

Thanks, the explanation is helpful. I’m going back through the code and realizing I’m probably setting my duration too high. I want to collect some data after my dilutions occur, so I need to set my duration to 0.01 so that will run more often. Since it is operating as a turbidostat, this shouldn’t (theoretically) make much of a difference. When a dilution isn’t performed, it will just attempt to collect data.

I haven’t tried such a low duration value before and I was wondering if having lower duration values might cause any issues.

I think the default for turbidostat is 0.5, which means “check every 30s if I should pump”. 0.01 seems low, but in theory it should work.

I got this error just now:

15:49:12 pioreactor1 get charts contrib Yaml error in 07_normalized_od_24_hours.yaml: Expected int | str, got float - at $.lookback

I wanted to zoom in on one of my graphs so I set the lookback value to below 1. I’m guessing this made it go from a string or int to a float.

Running dilution = 0.01 seems to be working mostly well, but I think I might be having a potential issue. Sometimes I see small spikes in od during dilutions events, so I don’t want to collect data during a dilution event. If my duration is less than the time it takes to run execute_io_action(), then I want to wait until the dilution event is over before I start collecting data. Is there a way to check whether a pump is currently running?

Also, I think it may have been queuing up multiple events, but I fixed this by adding a minimum time between dilution events. It might be useful to have some sort of function that could check when the last dilution event was, or if there is currently a dilution event in progress.

Oh yea, I think that’s a problem on my side, lookback should be able to be a float, not just an int. I’ll fix in the next release, but just stick with ints for now.