Controlling multiple pioreactors through a script

How can I control multiple pioreactors through a script? This is an example of my current script:

from pioreactor.background_jobs.dosing_control import DosingController
from pioreactor.background_jobs.temperature_control import TemperatureController
from pioreactor.utils.timing import RepeatedTimer
from pioreactor.whoami import get_unit_name, get_latest_experiment_name

Class Schedule():
	def __init__(self, **kwargs):
		pass
		
    def main_loop(self):
        self.current_time = perf_counter()
        growth_rate = self.get_latest_growth_rates()
        if growth_rate > self.min_growth_rate:
			# main loop logic here
			# I change the automation parameters like below
            dc.automation_job.target_normalized_od = 1 # example
			
    def background_loop(self):
        self.current_time = perf_counter()
        if self.time_since_main_loop() - 3600 >= 0:
            # some logic

HOURSTOSECONDS = 3600

sc = Schedule()
dc = DosingController(
        "turbidostat",
        duration = 1,
        unit=get_unit_name(),
        experiment=get_latest_experiment_name()
        )
tc = TemperatureController(
        "thermostat",
        target_temperature=30,
        unit=get_unit_name(),
        experiment=get_latest_experiment_name()
        )

main_loop = RepeatedTimer(24*HOURS_TO_SECONDS, sc.main_loop).start()
background_loop = RepeatedTimer(1*HOURS_TO_SECONDS, sc.background_loop).start()

dc.block_until_disconnected()

So far I have only been controlling the leader itself, so I don’t really know how to scale this up or send commands to a worker. How would I go about telling every worker to start a chemostat at some initial value?

How can I control multiple pioreactors through a single script? Also, how should I structure the code? For example, I was thinking each pioreactor might be its own object and then all the pioreactors could be stored in a single Python list.

This is quite easy. You could use the UI, but also if you are on the leader, you can use:

pios run dosing_control --automation-name chemostat --volume 1.0 --duration 60

Note the pios (with an s) vs pio. The former executes the command on all worker pioreactors in the cluster, whereas the latter runs it on the local pioreactor.


How about your script? I think the most straightforward way is to turn this into a job, so, like above, you can do:

pios run my_script --arg 1 --arg 2

and also start it from the UI. However, this does means putting the same code onto each Raspberry Pi in your cluster.


First, let’s turn your script into a job. Copy your script file into the plugins folder:

cp my_script.py ~/.pioreactor/plugins

Edit the file, and put the controller code into a new function, named click_my_script (this can be changed), and wrapping it with @click.command(name="my_script"). This is shown below:

from pioreactor.background_jobs.dosing_control import DosingController
from pioreactor.background_jobs.temperature_control import TemperatureController
from pioreactor.utils.timing import RepeatedTimer
from pioreactor.whoami import get_unit_name, get_latest_experiment_name

HOURSTOSECONDS = 3600

class Schedule():
    def __init__(self, **kwargs):
        pass
        
    def main_loop(self):
        self.current_time = perf_counter()
        growth_rate = self.get_latest_growth_rates()
        if growth_rate > self.min_growth_rate:
            # main loop logic here
            # I change the automation parameters like below
            dc.automation_job.target_normalized_od = 1 # example
            
    def background_loop(self):
        self.current_time = perf_counter()
        if self.time_since_main_loop() - 3600 >= 0:
            pass
            # some logic

### start of new code

import click

@click.command(name="my_script") # the name field is used in the invocation `pio run X`
def click_my_script():
    sc = Schedule()
    dc = DosingController(
            "turbidostat",
            duration = 1,
            unit=get_unit_name(),
            experiment=get_latest_experiment_name()
            )
    tc = TemperatureController(
            "thermostat",
            target_temperature=30,
            unit=get_unit_name(),
            experiment=get_latest_experiment_name()
            )
    
    main_loop = RepeatedTimer(24*HOURS_TO_SECONDS, sc.main_loop).start()
    background_loop = RepeatedTimer(1*HOURS_TO_SECONDS, sc.background_loop).start()
    
    dc.block_until_disconnected()

Now, you should be able to do pio run my_script on the command line, and this code will start running!

The next step would be to move this same Python file to the workers ~/.pioreactor/plugins folder. Once that is in place, you can do pios run my_script to start the job on all workers!

Finally, you can add this to the UI, too: Adding your plugins to the web interface | Pioreactor Docs


There’s not yet an API that looks like (running on leader):

p1 = Pioreactor("worker1")
p2 = Pioreactor("worker2")

ps = Cluster([p1, p2])

ps.start_stirring()
ps.start_dosing_control("chemostat", volume=1, duration=60)

Maybe in the future?

Thanks. That helps me think about better questions to ask.

So I can start a job/script on all pioreactors using pios run my_script --arg 1 --arg 2 on the leader. But I am not able (yet) to specifically assign job x_n to pioreactor y_n?

Is there a way to communicate from one pioreactor to another? For example, I could install a job (script) on every pioreactor, and as a job input I can assign it an ID and a worker/leader flag. I would need to be able to communicate between the leader and each worker to exchange information. Then, I can have each worker “listen” for commands and “reporting” information, and I can have the leader “sending” commands and “listening” for information. The leader can decide how each worker should be operating, and then publish that information to a specific worker in an organized format.

I think the mqtt might be useful for this. I could send a pio mqtt [sender] [receiver] [command/data] message from any one pioreactor in the cluster to any other pioreactor in the cluster. Is this approach feasible? I’m not sure how to send or read mqtt messages (yet) from within a script.

Also, if I wanted to have the pioreactor run a command on the raspberry pi, how can I do that within a Python script? For example, if I wanted a script to say pio run script or pio mqtt example_message in the raspberry pi?

You can use a --units option in pios to specify workers, something like

pios run job_x_n --arg 1 --arg 2 --units worker1 --units worker3 

This can be done with the subprocess module. Ex:

from subprocess import call
call("nohup pio run script &", shell=True)

will run pio run script like it was run on the command line.


Is there a way to communicate from one pioreactor to another?

This is primarily what MQTT is for. We use the Paho Python library for this. In your script, you could define some callbacks and the leader can publish information to a topic that script code subscribes to. If you’re defining new automations / background jobs, a lot of this is wrapped up on pub_client and subscribe_and_callback: Introduction to background jobs | Pioreactor Docs

Is there a way to communicate from one pioreactor to another? For example, I could install a job (script) on every pioreactor, and as a job input I can assign it an ID and a worker/leader flag. I would need to be able to communicate between the leader and each worker to exchange information. Then, I can have each worker “listen” for commands and “reporting” information, and I can have the leader “sending” commands and “listening” for information. The leader can decide how each worker should be operating, and then publish that information to a specific worker in an organized format.

Thinking more about this, what were the applications you were thinking of with this system?

I want each of my workers to be operating how the leader tells them to and regularly reporting info such as growth rates and NOD’s. The leader will collect the reported information, and then choose new and independent operating conditions for each worker. The new operating conditions for a pioreactor will depend on not only that pioreactor’s current environment and reported data, but also on the current environments and reported data of other pioreactors.

I’m still trying to grok mqtt, but I think I might be able to use the “published_settings” (sp?) to send/receive data from the leader and each worker. Then I can pull all the data from the leader, do some logic, and then send updated information to each worker.

The leader will collect the reported information, and then choose new and independent operating conditions for each worker.

Though possible, this is a bit off the recommend path, so “here be dragons”. Why do I say off the recommend path? Typically we want each worker to be able to act independently, with input from the user: starting & stopping jobs, editing settings, etc.

Logic that says “if worker1 NOD > 10, turn up worker1 heater” that could live on the leader (I’ll show how below), but it could also live on workers. Even if we wanted cross-pioreactor chatter, like “if worker1 NOD > 10, turn up worker2 heater”, that can still live on workers (this is also a bit off the recommend path, but still possible)

So how could you implement some of this logic on the leader? First, let’s examine how we change settings on a worker. Jobs on workers listen to particular topics in MQTT. For example, a chemostat on worker1 listens to the topic string pioreactor/worker1/<experiment>/dosing_automation/volume/set. When anyone publishes a message to that topic, worker1 has logic that will update volume to the message.

# change worker1 chemostat dosing volume to 1.5ml
client.publish(f"pioreactor/worker1/{get_latest_experiment()}/dosing_automation/volume/set", 1.5)

This works because volume is in the published_settings dictionary the chemostat code.

We can also listen for topics using the inverse of publish, which is subscribing. When a new NOD is published to MQTT from worker1, anyone who is subscribed to the topic pioreactor/worker1/<experiment>/growth_rate_calculating/od_filtered will get the message. We set up a callback to do something when we get a message. For example, in this code we execute the function _set_normalized_od whenever a new data point arrives on that topic.

One thing to note is that we can use + in MQTT topics as a wildcard. Look this up to see what I mean.

Putting this altogether, here’s a simple listener that can live on leader that can listen to NOD and execute volume changes for all workers (this is a sketch):

import json
from pioreactor.background_jobs.base import BackgroundJobContrib
from pioreactor.whoami import get_unit_name, get_latest_experiment

class Brains(BackgroundJobContrib):

   def __init__(self, unit, experiment):
        super().__init__(unit, experiment, plugin_name="brains")  
        self.start_passive_listeners()
 
   def conditionally_change_volume(self, msg):
         # data is json blobs - this turns it into a dict
         data = json.loads(msg.payload) 

         # extract _who_ sent this message, ie. which worker
         unit = msg.topic.split("/")[1]

         if float(data['od_filtered']) > 10:
              self.publish(f"pioreactor/{unit}/{self.experiment}/dosing_automation/volume/set", 1.5)
  
   def start_passive_listeners(self):
       self.subscribe_and_callback(self.conditionally_change_volume, f"pioreactor/+/{self.experiment}/growth_rate_calculating/od_filtered")

b = Brains()
b.block_until_disconnected()