“Observer” to start pH Measurement

Hi @Johannes,

There are two ways to solve this, I think.

  1. You can re-implement the chemostat automation to include a pH reading at the end. This doesn’t use the full background-job code in the linked issue, except for the logic for reading from the sensor. I’ve marked the new lines with #NEW to give you an idea of what’s going on. (FYI I haven’t run this code)
# -*- coding: utf-8 -*-
from __future__ import annotations
import busio
from time import sleep

from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistent_storage

# also include the `parser` and `register_source_to_sink` code here. 

# NEW
class pHReader():

    def __init__(self):
        self.i2c_channel = 0x40 # CHANGE THIS 
        self.i2c = busio.I2C(3,2)

    def read_pH(self):
        samples = 2
        running_sum = 0.0
        for _ in range(samples):
            running_sum += self.query("R")
            sleep(0.05)

        self.pH = running_sum/samples
        return pH

    def write(self, cmd):
        cmd_bytes = bytes(cmd + "\x00", "latin-1")  # Null-terminated command
        self.i2c.writeto(self.i2c_channel, cmd_bytes)

    @staticmethod
    def handle_raspi_glitch(response):
        return [chr(x & ~0x80) for x in response]

    def read(self, num_of_bytes=31):
        result = bytearray(num_of_bytes)
        self.i2c.readfrom_into(self.i2c_channel, result)
        response = self.get_response(result)
        char_list = self.handle_raspi_glitch(response[1:])
        return float(''.join(char_list))

    def query(self, command):
        self.write(command)
        current_timeout = 1.5
        sleep(current_timeout)
        return self.read()

    @staticmethod
    def get_response(raw_data):
        return [i for i in raw_data if i != 0]


class ChemostatWithPHReading(DosingAutomationJob):  # NEW
    """
    Chemostat mode - try to keep [nutrient] constant.
    """

    automation_name = "chemostat_with_ph_reading" # NEW
    published_settings = {
        "volume": {"datatype": "float", "settable": True, "unit": "mL"},
        "pH": {"datatype": "float", "settable": False}, # NEW
    }

    def __init__(self, volume: float | str, **kwargs) -> None:
        super().__init__(**kwargs)

        with local_persistent_storage("active_calibrations") as cache:
            if "media_pump" not in cache:
                raise CalibrationError("Media and waste pump calibration must be performed first.")
            elif "waste_pump" not in cache:
                raise CalibrationError("Media and waste pump calibration must be performed first.")

        self.volume = float(volume)
        self.pHReader = pHReader()  # NEW

    def execute(self) -> events.DilutionEvent:
        volume_actually_cycled = self.execute_io_action(media_ml=self.volume, waste_ml=self.volume)

        self.pH = pHReader.read_pH() # NEW

        return events.DilutionEvent(
            f"exchanged {volume_actually_cycled['waste_ml']}mL",
            data={"volume_actually_cycled": volume_actually_cycled["waste_ml"]},
        )

This python file would live in ~/.pioreactor/plugins/, and it essentially creates a new automation called chemostat_with_ph_reading.

You’ll also have to add a new automation YAML to the ~/.pioreactor/plugins/ui/contrib/automations/dosing folder for the chemostat_with_ph_reading automation to get it to display in the dropdown.

  1. The other option is to modify the background-job class in that linked issue to listen for remove_waste events:
from pioreactor import structs # NEW

 ....
class PHReader(BackgroundJobContrib):
     ...

    def __init__(self, ....):

       ...
       self.start_passive_listeners()

    def start_passive_listeners(self):
        # process incoming data
        self.subscribe_and_callback(
            self.respond_to_dosing_event_from_mqtt,
            f"pioreactor/{self.unit}/{self.experiment}/dosing_events",
            qos=QOS.EXACTLY_ONCE,
            allow_retained=False,
        )


    def respond_to_dosing_event_from_mqtt(self, message: pt.MQTTMessage) -> None:
        dosing_event = decode(message.payload, type=structs.DosingEvent)
        if dosing_event.event == "remove_waste":
          self.read_pH()

        return

You can also remove all “timer” logic, since you don’t need it - you’ll be responding the remove_waste events. Does that make sense?


I’m happy to help more if my solutions aren’t clear!