Hi @Johannes,
There are two ways to solve this, I think.
- You can re-implement the chemostat automation to include a pH reading at the end. This doesn’t use the full background-job code in the linked issue, except for the logic for reading from the sensor. I’ve marked the new lines with
#NEW
to give you an idea of what’s going on. (FYI I haven’t run this code)
# -*- coding: utf-8 -*-
from __future__ import annotations
import busio
from time import sleep
from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistent_storage
# also include the `parser` and `register_source_to_sink` code here.
# NEW
class pHReader():
def __init__(self):
self.i2c_channel = 0x40 # CHANGE THIS
self.i2c = busio.I2C(3,2)
def read_pH(self):
samples = 2
running_sum = 0.0
for _ in range(samples):
running_sum += self.query("R")
sleep(0.05)
self.pH = running_sum/samples
return pH
def write(self, cmd):
cmd_bytes = bytes(cmd + "\x00", "latin-1") # Null-terminated command
self.i2c.writeto(self.i2c_channel, cmd_bytes)
@staticmethod
def handle_raspi_glitch(response):
return [chr(x & ~0x80) for x in response]
def read(self, num_of_bytes=31):
result = bytearray(num_of_bytes)
self.i2c.readfrom_into(self.i2c_channel, result)
response = self.get_response(result)
char_list = self.handle_raspi_glitch(response[1:])
return float(''.join(char_list))
def query(self, command):
self.write(command)
current_timeout = 1.5
sleep(current_timeout)
return self.read()
@staticmethod
def get_response(raw_data):
return [i for i in raw_data if i != 0]
class ChemostatWithPHReading(DosingAutomationJob): # NEW
"""
Chemostat mode - try to keep [nutrient] constant.
"""
automation_name = "chemostat_with_ph_reading" # NEW
published_settings = {
"volume": {"datatype": "float", "settable": True, "unit": "mL"},
"pH": {"datatype": "float", "settable": False}, # NEW
}
def __init__(self, volume: float | str, **kwargs) -> None:
super().__init__(**kwargs)
with local_persistent_storage("active_calibrations") as cache:
if "media_pump" not in cache:
raise CalibrationError("Media and waste pump calibration must be performed first.")
elif "waste_pump" not in cache:
raise CalibrationError("Media and waste pump calibration must be performed first.")
self.volume = float(volume)
self.pHReader = pHReader() # NEW
def execute(self) -> events.DilutionEvent:
volume_actually_cycled = self.execute_io_action(media_ml=self.volume, waste_ml=self.volume)
self.pH = pHReader.read_pH() # NEW
return events.DilutionEvent(
f"exchanged {volume_actually_cycled['waste_ml']}mL",
data={"volume_actually_cycled": volume_actually_cycled["waste_ml"]},
)
This python file would live in ~/.pioreactor/plugins/
, and it essentially creates a new automation called chemostat_with_ph_reading
.
You’ll also have to add a new automation YAML to the ~/.pioreactor/plugins/ui/contrib/automations/dosing
folder for the chemostat_with_ph_reading
automation to get it to display in the dropdown.
- The other option is to modify the background-job class in that linked issue to listen for remove_waste events:
from pioreactor import structs # NEW
....
class PHReader(BackgroundJobContrib):
...
def __init__(self, ....):
...
self.start_passive_listeners()
def start_passive_listeners(self):
# process incoming data
self.subscribe_and_callback(
self.respond_to_dosing_event_from_mqtt,
f"pioreactor/{self.unit}/{self.experiment}/dosing_events",
qos=QOS.EXACTLY_ONCE,
allow_retained=False,
)
def respond_to_dosing_event_from_mqtt(self, message: pt.MQTTMessage) -> None:
dosing_event = decode(message.payload, type=structs.DosingEvent)
if dosing_event.event == "remove_waste":
self.read_pH()
return
You can also remove all “timer” logic, since you don’t need it - you’ll be responding the remove_waste events. Does that make sense?
I’m happy to help more if my solutions aren’t clear!