Creating an automation to control both temperature and dosing

Hi. I want to try controlling both temperature and dosing from a single automation, but this is new territory for me and I am not sure how to approach this or what conflicts might arise and why. If you have any advice/suggestions, or possible relevant terminology that I could google, that would be great! So far, I have been using the /pioreactor/automations/dosing/chemostat.py as a template and building everything off there, so I am feeling a lot more confident working with dosing and inheritance from DosingAutomationJob.

I am hoping to create an automation which can schedule when to update and run and update dosing (nutrient media, salt solution alternate media) and temperature control.

I am not really sure what questions I should be asking, so I tried writing out my thought process. Here is what I think I should generally do:

1. My class ChemostatAltMedia should be inheriting from both DosingAutomationJobContrib and Thermostat, or possibly TemperatureController (this would be inherited through Thermostat regardless, right?)

I would define my class like this: class ChemostatAltMedia(DosingAutomationJobContrib, Thermostat):. I see that there is an execute(self) function defined in both parent classes DosingAutomationJobContrib and Thermostat, and I am also defining execute(self) in child class ChemostatAltMedia. How are these conflicts resolved, and should I be concerned about these types of conflicts? I suspect that an instance attribute/function will overwrite inherited class attributes from a
parent, but what about conflicts when inheriting class attributes or functions from two different parents? Moreover, can I still call these inherited attributes/functions via something like self.DosingAutomationContrib.execute(), self.Thermostat.execute(), and self.execute()?

2. I will need to update how I instantiate my class.

So far, I have only been working with dosing, so I have been instantiating my classes via dc = DosingController("scheduled_chemostat_alt_media",...). I suspect I might need to expand this somehow, but I am not sure what that would look like. Maybe something like this?

if __name__ == "__main__": 
  from pioreactor.background_jobs.dosing_control import DosingController
  from pioreactor.background_jobs.temperature_control import TemperatureController
  
  dc = DosingControl("scheduled_chemostat_alt_media",...)
  dc = TemperatureController(dc,...)

3. Here is my code that I have written so far.
Edit: I cleaned up and removed a lot of code in a reply to this post.

Currently, it is giving me an error saying __init__() missing 1 required positional argument: 'target_temperature', but this goes away when I remove Thermostat from class ChemostatAltMedia(...). I am passing in a target_temperature argument to DosingController, but that is not resolving the issue. I have tried a couple different changes to the code without success, but I think I need to get a better idea of what is going on to troubleshoot this more effectively. I have been looking through the source code but havenā€™t had luck so far.

# -*- coding: utf-8 -*-
from __future__ import annotations

from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJobContrib
from pioreactor.automations.temperature.thermostat import Thermostat # Question: Is this what I should be importing? Or is another automation more ideal?
from pioreactor.exc import CalibrationError # [TODO] Remove this line and see if anything breaks
from pioreactor.utils import local_persistant_storage
 

class ChemostatAltMedia(DosingAutomationJobContrib, Thermostat):
    """
    Chemostat mode - try to keep [nutrient] constant.
    """

    automation_name = "scheduled_chemostat_alt_media"
    published_settings = {
        "volume": {"datatype": "float", "settable": True, "unit": "mL"},
        "fraction_alt_media": {"datatype": "float", "settable": True, "unit": "%"},
        "duration": {"datatype": "float", "settable": True, "unit": "min"},
        "array": {"datatype": "array", "settable": True, "unit": "N/A, mL"},
    } 

    # [TODO] Remove array as a function argument.
    def __init__(self, media_ml: float, fraction_alt_media: float, target_temperature, salt_value_array, dose_value_array, array, **kwargs):
        super(ChemostatAltMedia, self).__init__(**kwargs)

        self.volume = float(media_ml)
        self.counter = 0
        self.alt_media_ml = 0 # [TODO] Remove this line and see if anything breaks
        self.media_ml = 0 # Remove this line and see if anything breaks
        self.alt_dosing_schedule = array # [TODO] Remove this line. 
        self.salt_value_array = salt_value_array # xxxx_value_array stores the future scheduled values
        self.dose_value_array = dose_value_array
        # xxxx_state_array keeps track of when different features should be run or updated
        self.dose_state_array = {
                'run_flag': True, 'run_interval': 20, 'run_counter': 0,
                'update_flag': True, 'update_interval': 30, 'update_counter': 0,
                'run_amount': 0.5, 'previous_run_amount': 0, 'name': 'nutrient_media',
                'interval_counter': 0}
        self.salt_state_array = {
                'run_flag': True, 'run_interval': 8, 'run_counter': 0,
                'update_flag': True, 'update_interval': 720, 'update_counter': 0,
                'run_amount': 0, 'previous_run_amount': 0, 'name': 'salt_media',
                'interval_counter': 0} # 720 update interval -> 12 hours
        # self.temp_state_array will control the temperature schedule. Current values are copied over from self.salt_state_array
        self.temp_state_array = {
                'run_flag': True, 'run_interval': 8, 'run_counter': 0,
                'update_flag': True, 'update_interval': 720, 'update_counter': 0,
                'run_amount': 0, 'previous_run_amount': 0, 'name': 'temp_control',
                'interval_counter': 0} # 720 update interval -> 12 hours


    def check_for_update(self, state_array):
        # This function takes in a state array and checks whether an update or dosing is required
        # [TODO] Add in a check for the type of state array. Temperature or LEDs may 'run' differently than dosings
        if self.counter - state_array['run_counter'] >= state_array['run_interval']:
            state_array['run_flag'] = True
            state_array['run_counter'] = self.counter
        if self.counter - state_array['update_counter'] >= state_array['update_interval']:
            state_array['update_flag'] = True
            state_array['update_counter'] = self.counter
        return state_array

    def run_dosing(self):
        # By default, no media should be added on any given run_dosing()
        media_ml = 0
        alt_media_ml = 0
        waste_ml = 0
        # Check what media is scheduled to be added and set the appropriate amount
        if self.dose_state_array['run_flag'] == True: # When media is required, set the amount to add and then turn off the flag
            media_ml = self.dose_state_array['run_amount'] 
            self.dose_state_array['run_flag'] = False 
        if self.salt_state_array['run_flag'] == True: # When salt is required, set the amount to add and then turn off the flag
            alt_media_ml = self.salt_state_array['run_amount']
            self.salt_state_array['run_flag'] = False
        # Calculate waste amount and execute dosing
        waste_ml = media_ml + alt_media_ml
        volume_actually_cycled = self.execute_io_action(alt_media_ml, media_ml, waste_ml)
        return volume_actually_cycled 
               

    def update_dosing_values(self, state_array):
        if state_array['update_flag'] == True:
            if state_array['name'] == 'salt_media':
                state_array['run_amount'] = self.salt_value_array[0][state_array['interval_counter']]
                state_array['run_interval'] = self.salt_value_array[1][state_array['interval_counter']]
                state_array['interval_counter'] += 1

            elif state_array['name'] == 'nutrient_media':
                state_array['run_amount'] = state_array['run_amount'] # Not currently in use.

            state_array['update_flag'] = False
        return state_array
            
    def update_temperature_values(self, state_array):
        # [TODO] Write code to update temperature values here
        return


    def execute(self) -> events.DilutionEvent:
       
        self.dose_state_array = self.check_for_update(self.dose_state_array) # Flag any values that are due to update
        self.dose_state_array = self.update_dosing_values(self.dose_state_array) # Update any values that are flagged for an update
        self.salt_state_array = self.check_for_update(self.salt_state_array)
        self.salt_state_array = self.update_dosing_values(self.salt_state_array)

        ## Eventually control temperature schedule ##
        # self.temp_state_array = self.check_for_update(self.temp_state_array)
        # self.temp_state_array = self.update_temperature_values(self.temp_state_array)

        volume_actually_cycled = self.run_dosing() # Run the dosing with updated values
        
        self.counter += 1 # Counter increments every 'duration' (i.e., 1 minute) and is used to track time elapsed since start
        return events.DilutionEvent(
            f"exchanged {volume_actually_cycled[0]}mL",
            data={"volume_actually_cycled": volume_actually_cycled[0]},
        )
    
if __name__ == "__main__": 
    from pioreactor.background_jobs.dosing_control import DosingController
    
    dc = DosingController(
        "scheduled_chemostat_alt_media",
        duration=1,
        fraction_alt_media=0,
        media_ml = 0.6, # currently not in use.
        volume=1.0,
        target_temperature = 25,
        array=[[5, 10, 20, 40, 60, 80], [0.5, 0, 1.0, 0, 1.5, 0]], # [TODO] Remove array as an argument
        salt_value_array = [[0, 0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0, 0.1, 0, 0.1], [60, 55, 55, 25, 25, 15, 15, 10, 10, 7, 7, 5, 5, 3.6, 3.6, 2.5]],
        dose_value_array = [],
        unit="test_unit",
        experiment="test_experiment"
        )
    dc.block_until_disconnected()

I went back and removed most of the code related to dosing to clean it up. That code is currently working as expected and is not needed for the issue I am trying to troubleshoot.

# -*- coding: utf-8 -*-
from __future__ import annotations

from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJobContrib
from pioreactor.automations.temperature.thermostat import Thermostat # Question: Is this what I should be importing? Or is another automation more ideal?
from pioreactor.exc import CalibrationError # [TODO] Remove this line and see if anything breaks
from pioreactor.utils import local_persistant_storage
 

class ChemostatAltMedia(DosingAutomationJobContrib, Thermostat):
    automation_name = "scheduled_chemostat_alt_media"
    published_settings = {"volume": {"datatype": "float", "settable": True, "unit": "mL"}, "fraction_alt_media": {"datatype": "float", "settable": True, "unit": "%"}, "duration": {"datatype": "float", "settable": True, "unit": "min"}, "array": {"datatype": "array", "settable": True, "unit": "N/A, mL"}, } 

    # [TODO] Remove array as a function argument.
    def __init__(self, media_ml: float, fraction_alt_media: float, target_temperature, salt_value_array, dose_value_array, array, **kwargs):
        super(ChemostatAltMedia, self).__init__(**kwargs)

        self.volume = float(media_ml)
        self.counter = 0
        self.alt_media_ml = 0 # [TODO] Remove this line and see if anything breaks
        self.media_ml = 0 # Remove this line and see if anything breaks
        self.alt_dosing_schedule = array # [TODO] Remove this line. 

    def run_dosing(self):
        volume_actually_cycled = self.execute_io_action(alt_media_ml=0, media_ml=0, waste_ml=0)
        return volume_actually_cycled 
               
            
    def update_temperature_values(self, state_array):
        # [TODO] Write code to update temperature values here
        return

    def execute(self) -> events.DilutionEvent:
        volume_actually_cycled = self.run_dosing() # Run the dosing with updated values
        self.counter += 1 # Counter increments every 'duration' (i.e., 1 minute) and is used to track time elapsed since start
        return events.DilutionEvent( f"exchanged {volume_actually_cycled[0]}mL", data={"volume_actually_cycled": volume_actually_cycled[0]}, )
    
if __name__ == "__main__": 
    from pioreactor.background_jobs.dosing_control import DosingController
    
    dc = DosingController(
        "scheduled_chemostat_alt_media", duration=1, fraction_alt_media=0, media_ml = 0.6, volume=1.0,
        target_temperature = 25,
        array=[], salt_value_array = [], dose_value_array = [], unit="test_unit", experiment="test_experiment"
        )
    dc.block_until_disconnected()

Can you tell me more about how you want the temperature / heating to behave?

Generally, different automation types (temperature, dosing, and LEDs) are designed to be isolated. That way, itā€™s not possible for automations to interfere with each other (imagine two chemostats trying to run, independent of each other - overflows could occur!).

My suggestion would be to structure your code as follows (your dosing code is near identical):

# -*- coding: utf-8 -*-
from __future__ import annotations

from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJobContrib
from pioreactor.automations.temperature.thermostat import Thermostat 
from pioreactor.utils import local_persistant_storage
 

class ChemostatAltMedia(DosingAutomationJobContrib):
    automation_name = "scheduled_chemostat_alt_media"
    published_settings = {"volume": {"datatype": "float", "settable": True, "unit": "mL"}, "fraction_alt_media": {"datatype": "float", "settable": True, "unit": "%"}, "duration": {"datatype": "float", "settable": True, "unit": "min"}, "array": {"datatype": "array", "settable": True, "unit": "N/A, mL"}, } 

    # [TODO] Remove array as a function argument.
    def __init__(self, media_ml: float, fraction_alt_media: float, target_temperature, salt_value_array, dose_value_array, array, **kwargs):
        super(ChemostatAltMedia, self).__init__(**kwargs)

        self.volume = float(media_ml)
        self.counter = 0
        self.alt_media_ml = 0 # [TODO] Remove this line and see if anything breaks
        self.media_ml = 0 # Remove this line and see if anything breaks
        self.alt_dosing_schedule = array # [TODO] Remove this line. 

    def run_dosing(self):
        volume_actually_cycled = self.execute_io_action(alt_media_ml=0, media_ml=0, waste_ml=0)
        return volume_actually_cycled 
               

    def execute(self) -> events.DilutionEvent:
        volume_actually_cycled = self.run_dosing() # Run the dosing with updated values
        self.counter += 1 # Counter increments every 'duration' (i.e., 1 minute) and is used to track time elapsed since start
        return events.DilutionEvent( f"exchanged {volume_actually_cycled[0]}mL", data={"volume_actually_cycled": volume_actually_cycled[0]}, )
    


class MyTempAutomation(Thermostat):

    automation_name = "my_temp_automation"

    def __init__(self, parameter1, parameter2, **kwargs):
        super().__init__(**kwargs)
        ...

    def execute():
        self.logger.info("Changing heating etc.")
        ...




if __name__ == "__main__": 
    from pioreactor.background_jobs.dosing_control import DosingController
    from pioreactor.background_jobs.temperature_control import TemperatureController
    
    dc = DosingController(
        "scheduled_chemostat_alt_media", duration=1, fraction_alt_media=0, media_ml = 0.6, volume=1.0,
        target_temperature = 25,
        array=[], salt_value_array = [], dose_value_array = [], unit="test_unit", experiment="test_experiment"
        )

    tc = TemperatureController(
        "my_temp_automation",
        parameter1="", parameter2="",
        unit="test_unit", experiment="test_experiment"
    )
   # both tc and dc are running in the background at this point
    dc.block_until_disconnected()

Iā€™ve left the temperature logic empty as a placeholder. Note that it inherits from Thermostat. Maybe you were expecting it to inherit from TemperatureAutomationJobContrib, but Theromostat has some useful logic if we want to target some specific temperature. For example, hereā€™s an example of cycling the temperature between 30C and 40C over 24 hours. We wrote a blog article about it too.

I can help with the temperature logic bits, but please tell me more about the behaviour you want (and how/if it interacts with dosing?)

Thanks! I had not thought of creating a second class within the file.

For how I want it to behave, right now I am trying to create a way of scheduling temperature and dosing changes automatically. Currently, the way I would do this is going to the webpage/GUI and change the temperature and chemostat automations manually. Eventually, maybe Iā€™d just have an excel sheet that has a schedule of the settings I want and executes at the right time. Since I want to learn Python, I figured I could try and implement this feature myself.

I guess Iā€™m going to be reading up on class inheritance. I am currently trying to create a schedule take in formatted data where I can input schedule changes after ā€˜xā€™ durations. Iā€™m using self.counter to track how many durations have elapsed since starting the automation/experiment. I will want to make sure the two classes/automations are in sync. How would you recommend I generally structure such a code? Something like

class scheduler()
    __init__
    ta = temp_automation()
    dc = dose_automation()
    self.counter = 0
    __loop__
    ta.execute(counter)
    dc.execute(counter)
    self.counter += 1
    __end_loop__

class temp_automation(counter)
    # temperature code here
    return

class dose_automation(counter)
    # dosing code here
    return

if __name__ == "__main__": 
    
    sc = scheduler( [arguments])
    sc.block_until_disconnected()

Hereā€™s an example you can use. Goal: dose media/alt media, and update temperature, based on a schedule with irregular time intervals. Unlike the previous code in this thread, we going to avoid defining our own classes, but instead manipulate existing objects.

First, instead of the DosingAutomationJob running every duration minutes, we are going to turn this loop off, and let an outside object, (RepeatedTimer, defined later) run every minute and weā€™ll define a function, update, to perform any updates to automations.

from pioreactor.background_jobs.dosing_control import DosingController
from pioreactor.background_jobs.temperature_control import TemperatureController
from pioreactor.utils.timing import RepeatedTimer
from pioreactor.whoami import get_unit_name
from pioreactor.whoami import get_latest_experiment_name

# "silent" automation is basically: do nothing. We'll manually invoke dosing later in the script
dc = DosingController(
    "silent", 
    duration=None,  # None, i.e. don't loop.
    unit=get_unit_name(), 
    experiment=get_latest_experiment_name()
)

tc = TemperatureController(
    "thermostat",
    target_temperature=30, # some initial temp
    unit=get_unit_name(), 
    experiment=get_latest_experiment_name()
)

# our schedule - later this could come from an excel sheet, etc.
SCHEDULE = {
    1:  {'media': 0.5, 'alt_media': 0.0, 'temperature': 35},
    20:  {'media': 0.0, 'alt_media': 0.5, 'temperature': 30},
    40: {'media': 0.5, 'alt_media': 0.0, 'temperature': 35},
    50: {'media': 0.0, 'alt_media': 0.5, 'temperature': 30},
    70: {'media': 0.5, 'alt_media': 0.0, 'temperature': 35},
}

count = 0

def update():
    global count
    count += 1

    if count in SCHEDULE:

        # time to do things! First pull out data
        temperture = SCHEDULE[count]['temperature']
        media_per_dose = SCHEDULE[count]['media']
        alt_media_per_dose = SCHEDULE[count]['alt_media']
        waste = media_per_dose + alt_media_per_dose

        # then, make the updates
        tc.automation_job.set_target_temperature(temperture)
        dc.automation_job.execute_io_action(alt_media_ml=alt_media_per_dose, media_ml=media_per_dose, waste_ml=waste)

# this RepeatedTimer will run `update` every 60 seconds. 
scheduler = RepeatedTimer(60, update).start()

# block here.
dc.block_until_disconnected()

The reason why I didnā€™t recommend a class-based solution (like how we were writing things previously) is that i) we are trying to sync multiple automations (temperature and dosing) which suggests we need a higher-level control, and ii) the logic per automation is relatively simple. On ii) what I mean is, we are only dosing (on command), or updating a temperature. We arenā€™t adding new logic (ex: dosing when OD > 0.5, update temperature if growth rate < 0.1) - so instead we can re-use existing logic.

Let me know if this works, or if you have questions. Unfortunately, there arenā€™t many examples of Pioreactor code (yet), so this code may look like a surprise turn-of-events to you - but I hope itā€™s clear.

A good question is: can I replace RepeatedTimer with a for/while loop? Yes, but you have to be careful. RepeatedTimer takes into account how long itā€™s callback (update in our code) takes, so it will strictly run every 60 seconds. A naive loop that look like:

count = 0
while True: 
     update()
     sleep(60)

wonā€™t work, because update() may take 10-20 seconds to run if dosing occurs, and hence the schedule will be off after that. You could time how long update takes, and incorporate that, but that basically what RepeatedTimer is already doing.

This helped quite a bit, thanks! I did want to try adding in a bit more logic to control when/how settings should change, so I created a Scheduler class with the relevant logic and added it to the file. The Scheduler class has no parent class and is only used to perform calculations and schedule dosing, and then I pass the values calculated by Scheduler to the DosingController and TemperatureController objects. I am running a quick test run right now, but I think it is working as intended.

I am hoping to start an experiment in the next day or two (and will post my results on the forum), but I had a few last questions.

  1. How do I run my python script for multiple days without it shutting down if the terminal closes?

Edit2: Think I figured this one out.
I am running the program in the background with:
$ nohup /home/pioreactor/my_python_script.py > logfile_my_python_script.out &
and I verified it was running with:
$ ps ax | grep python

Will this conflict with the dc.block_until_disconnected() that I always have at the end of my python files? I will try looking at the source files later today and seeing if I can answer that myself.

  1. How do I record temperature and send it to the web/GUI so that it is saved in the experiment and the temperature graph at pioreactor1.local webpage shows real-time temperature?

Is this a setting that I can pass in to TemperatureController when I am running a python script through terminal, or is there some way I can accomplish this? Is this related to the experiment='test_experiment' argument that is used when creating the dc and tc controller objects?

Edit: I figured this one out. When I created a new experiment named ā€œtest_experimentā€, the temperature data recorded from the thermostat automation I was running in Python appeared in the GUI. Iā€™m guessing that by changing the experiment name in the GUI to match the experiment name in the automation makes the GUI pull up the correct data points?

  1. Also, is there any temperature controller behavior that I should know about when using the temperature controller?

For example, what happens if I set the target temperature below room temperature? Will it just turn off the heater?
Can I call tc.automation_job.set_target_temperature(...) as often as Update() runs, or should I only run it when the target temperature changes?

Thanks again for your help.

Here is my code (some of the values are currently set for testing/debugging). For readability, I removed the Scheduler class and posted that in a second code snippet.

from pioreactor.background_jobs.dosing_control import DosingController
from pioreactor.background_jobs.temperature_control import TemperatureController
from pioreactor.utils.timing import RepeatedTimer
from pioreactor.whoami import get_unit_name

####
#### Schedule class defined here
####

# Lists of setpoints for dosing, salting, and temperaturing
dose_value_array=[0.5]
salt_value_array=[0, 0.2, 0, 0.4, 0, 0.6, 0, 0.8, 0, 1, 0, 1.2, 0, 1.4, 0, 1.6, 0, 1.8, 0, 2, 0, 2.2, 0, 2.4, 0, 2.6, 0, 2.8, 0, 3, 0, 3.2, 0, 3.4, 0, 3.6]
temp_value_array=[21, 28, 34, 37, 34, 28]

sc = Schedule(salt_value_array, dose_value_array, temp_value_array)


# "silent" automation is basically: do nothing. We'll manually invoke dosing later.
dc = DosingController(
        "silent",
        duration=None,
        unit=get_unit_name(),
        experiment="test_experiment"
        )
tc = TemperatureController(
        "thermostat",
        target_temperature=30,
        unit=get_unit_name(),
        experiment="test_experiment"
        )


counter = 0

previous_temp = 0

def update():
    global counter
    global previous_temp
    run_value_settings = sc.run_execute()
    temperature = run_value_settings['temp_target']
    media_per_dose = run_value_settings['media_ml']
    alt_media_per_dose = run_value_settings['alt_media_ml']
    waste_ml = run_value_settings['waste_ml']

    if temperature is not previous_temp:
        tc.automation_job.set_target_temperature(temperature)
        previous_temp = temperature
    dc.automation_job.execute_io_action(alt_media_ml=alt_media_per_dose, media_ml=media_per_dose, waste_ml = waste_ml)
    counter += 1
# this RepeatedTimer will run `update` every 60 seconds

scheduler = RepeatedTimer(30, update).start() # normally RepeatedTimer(60, update).start()

# block here
dc.block_until_disconnected()
class Schedule():
    def __init__(self, salt_value_array, dose_value_array, temp_value_array, **kwargs):
        self.salt_value_array = salt_value_array
        self.dose_value_array = dose_value_array
        self.temp_value_array = temp_value_array
        self.dose_state_array = {
                'run_flag': True, 'run_interval': 20, 'run_counter': 0,
                'update_flag': True, 'update_interval': 60, 'update_counter': 0,
                'run_target': 0.5, 'previous_run_amount': 0, 'name': 'nutrient_media',
                'interval_counter': 0}
        self.salt_state_array = {
                'run_flag': True, 'run_interval': 20, 'run_counter': 0,
                'update_flag': True, 'update_interval': 60, 'update_counter': 0,
                'run_target': 0.1, 'previous_run_amount': 0, 'name': 'salt_media',
                'interval_counter': 0} # 720 update interval -> 12 hours
        self.temp_state_array = {
            'run_flag': True, 'run_interval': 20, 'run_counter': 0,
            'update_flag': True, 'update_interval': 480, 'update_counter': 0,
            'run_target': 28, 'previous_run_amount': 0, 'name': 'temp_control',
            'interval_counter': 0} # 480 update interval -> 8 hours

    def check_for_updates(self, state_array):
        if counter - state_array['run_counter'] >= state_array['run_interval']:
            state_array['run_flag'] = True
            state_array['run_counter'] = counter
        if counter - state_array['update_counter'] >= state_array['update_interval']:
            state_array['update_flag'] = True
            state_array['update_counter'] = counter
        return state_array


    def update_target_values(self, state_array):
        if state_array['update_flag'] == True:
            if state_array['name'] == 'salt_media':
                target_molarity = self.salt_value_array[state_array['interval_counter']]
                if target_molarity == 0:
                    state_array['run_target'] = 0
                elif target_molarity > 0:
                    values = self.calc_salt_dosing(target_molarity)
                    state_array['run_target'] = values[0]
                    state_array['run_interval'] = values[1]
                state_array['interval_counter'] += 1
                state_array['interval_counter'] %= (len(self.salt_value_array)+1) # loop around to begining to prevent out-of-bounds

            elif state_array['name'] == 'nutrient_media':
                state_array['run_target'] = state_array['run_target'] # Not currently in use.
            
            elif state_array['name'] == 'temp_control':
                state_array['run_target'] = self.temp_value_array[state_array['interval_counter']]
                state_array['interval_counter'] += 1
                state_array['interval_counter'] %= (len(self.temp_value_array)+1)

            state_array['update_flag'] = False
        return state_array

    def get_run_values(self):
        media_ml = 0
        alt_media_ml = 0
        waste_ml = 0
        temp_target = 0

        if self.dose_state_array['run_flag'] == True:
            media_ml = self.dose_state_array['run_target']
            self.dose_state_array['run_flag'] = False
        if self.salt_state_array['run_flag'] == True:
            alt_media_ml = self.salt_state_array['run_target']
            self.salt_state_array['run_flag'] = False
        if self.temp_state_array['run_flag'] == True:
            temp_target = self.temp_state_array['run_target']
        temp_target = self.temp_state_array['run_target']
        waste_ml = media_ml + alt_media_ml
        run_value_settings = {'media_ml': media_ml, 'alt_media_ml': alt_media_ml, 'waste_ml': waste_ml, 'temp_target': temp_target}
        return run_value_settings
    
    def calc_salt_dosing(self, diluted_molarity):
        # takes a target molarity and returns saltwater dose volume and inter [minutes/dose]
        # Settings assume 6.0 molarity NaCl-water solution
        solution_molarity = 6.0 # NaCl saturation point ~= 6.1 molarity in 20C H2O
        salt_dose_volume = 0.1 # Starting dose volume

        fraction = diluted_molarity/(solution_molarity - diluted_molarity) # V_salt = V_Nut * M_dil / (M_NaCl - M_dil) = V_Nut * fraction
        nutrient_volume_per_hour = 60*self.dose_state_array['run_target']/self.dose_state_array['run_interval']

        salt_volume_per_hour = nutrient_volume_per_hour * fraction
        salt_dose_interval = round(60*dose_volume/salt_volume_per_hour)
        while salt_dose_interval < 20:
            salt_dose_volume += 0.1
            salt_dose_interval = round(60*salt_dose_volume/salt_volume_per_hour)
        
        values = [salt_dose_volume, salt_dose_interval]
        return values

    def run_execute(self):
        self.dose_state_array = self.check_for_updates(self.dose_state_array)
        self.dose_state_array = self.update_target_values(self.dose_state_array)
        self.salt_state_array = self.check_for_updates(self.salt_state_array)
        self.salt_state_array = self.update_target_values(self.salt_state_array)
        self.temp_state_array = self.check_for_updates(self.temp_state_array)
        self.temp_state_array = self.update_target_values(self.temp_state_array)
        run_value_settings = self.get_run_values()
        return run_value_settings

Iā€™ll answer in no particular order:

Iā€™m guessing that by changing the experiment name in the GUI to match the experiment name in the automation makes the GUI pull up the correct data points

Thatā€™s right. Thereā€™s a function you should use to automatically pull the latest experiment name from the database to Python:

from pioreactor.whoami import get_latest_experiment_name
...

tc = TemperatureController(
        "thermostat",
        target_temperature=30,
        unit=get_unit_name(),
        experiment=get_latest_experiment_name() # will return "test_experiment"
        )
...
  1. How do I run my python script for multiple days without it shutting down if the terminal closes?

Better to be explicit and specify python like so:

nohup python3 /home/pioreactor/test_python_script.py &

no need for a shebang. And yes, your redirect looks correct. (All these logs are also saved to the database, when needed).

how can I find and kill the process after running it? Will this be sufficient?

Yup, that command looks correct. block_until_disconnected is waiting for a signal like what pgrep provides, so it will work as intended!

Later (or now if youā€™d like), we can put this scriptā€™s control into the UI, so you can start, and exit from the UI, similar to stirring, od reading, etc.

Iā€™m guessing that by changing the experiment name in the GUI to match the experiment name in the automation makes the GUI pull up the correct data points?

Yea, temperature data should show up when the experiment names match, i.e. you use get_latest_experiment_name.

  1. Also, is there any temperature controller behavior that I should know about when using the temperature controller? For example, what happens if I set the target temperature below room temperature? Will it just turn off the heater?

Some important notes about heating:

  1. Temperature data is only recorded every ~4m, so data is not-quite realtime.
  2. Reaching a target temperature can take up to 30m for a large delta. This is by design, so we donā€™t hugely overshoot and harm the culture.
  3. If the target_temp is below ambient (plus a bit), the heating will simply not activate (ie. we only do passive cooling). If you needed below ambient, an option is to place in a fridge, or cooling incubator.

Your code looks good! Hard to believe youā€™re new to Python! Also, watching you work on the problem helps us design our APIs to be more useful, so this is all very helpful to us.

1 Like

Awesome, good to know.

I tried running it last night but had an issue and a couple more questions, but Iā€™ll make a new thread for this new topic.