“Observer” to start pH Measurement

Hi,

While using the pioreactor as a chemostat we want to measure the pH in a flow through cell. The pH measurement should only take place after the “remove_waste_from_bioreactor” function is invoked.

I guess I need something like this:

But not sure how to implement this.

The automation we use for pH-measurements is this.

Regards

Johannes

Hi @Johannes,

There are two ways to solve this, I think.

  1. You can re-implement the chemostat automation to include a pH reading at the end. This doesn’t use the full background-job code in the linked issue, except for the logic for reading from the sensor. I’ve marked the new lines with #NEW to give you an idea of what’s going on. (FYI I haven’t run this code)
# -*- coding: utf-8 -*-
from __future__ import annotations
import busio
from time import sleep

from pioreactor.automations import events
from pioreactor.automations.dosing.base import DosingAutomationJob
from pioreactor.exc import CalibrationError
from pioreactor.utils import local_persistent_storage

# also include the `parser` and `register_source_to_sink` code here. 

# NEW
class pHReader():

    def __init__(self):
        self.i2c_channel = 0x40 # CHANGE THIS 
        self.i2c = busio.I2C(3,2)

    def read_pH(self):
        samples = 2
        running_sum = 0.0
        for _ in range(samples):
            running_sum += self.query("R")
            sleep(0.05)

        self.pH = running_sum/samples
        return pH

    def write(self, cmd):
        cmd_bytes = bytes(cmd + "\x00", "latin-1")  # Null-terminated command
        self.i2c.writeto(self.i2c_channel, cmd_bytes)

    @staticmethod
    def handle_raspi_glitch(response):
        return [chr(x & ~0x80) for x in response]

    def read(self, num_of_bytes=31):
        result = bytearray(num_of_bytes)
        self.i2c.readfrom_into(self.i2c_channel, result)
        response = self.get_response(result)
        char_list = self.handle_raspi_glitch(response[1:])
        return float(''.join(char_list))

    def query(self, command):
        self.write(command)
        current_timeout = 1.5
        sleep(current_timeout)
        return self.read()

    @staticmethod
    def get_response(raw_data):
        return [i for i in raw_data if i != 0]


class ChemostatWithPHReading(DosingAutomationJob):  # NEW
    """
    Chemostat mode - try to keep [nutrient] constant.
    """

    automation_name = "chemostat_with_ph_reading" # NEW
    published_settings = {
        "volume": {"datatype": "float", "settable": True, "unit": "mL"},
        "pH": {"datatype": "float", "settable": False}, # NEW
    }

    def __init__(self, volume: float | str, **kwargs) -> None:
        super().__init__(**kwargs)

        with local_persistent_storage("active_calibrations") as cache:
            if "media_pump" not in cache:
                raise CalibrationError("Media and waste pump calibration must be performed first.")
            elif "waste_pump" not in cache:
                raise CalibrationError("Media and waste pump calibration must be performed first.")

        self.volume = float(volume)
        self.pHReader = pHReader()  # NEW

    def execute(self) -> events.DilutionEvent:
        volume_actually_cycled = self.execute_io_action(media_ml=self.volume, waste_ml=self.volume)

        self.pH = pHReader.read_pH() # NEW

        return events.DilutionEvent(
            f"exchanged {volume_actually_cycled['waste_ml']}mL",
            data={"volume_actually_cycled": volume_actually_cycled["waste_ml"]},
        )

This python file would live in ~/.pioreactor/plugins/, and it essentially creates a new automation called chemostat_with_ph_reading.

You’ll also have to add a new automation YAML to the ~/.pioreactor/plugins/ui/contrib/automations/dosing folder for the chemostat_with_ph_reading automation to get it to display in the dropdown.

  1. The other option is to modify the background-job class in that linked issue to listen for remove_waste events:
from pioreactor import structs # NEW

 ....
class PHReader(BackgroundJobContrib):
     ...

    def __init__(self, ....):

       ...
       self.start_passive_listeners()

    def start_passive_listeners(self):
        # process incoming data
        self.subscribe_and_callback(
            self.respond_to_dosing_event_from_mqtt,
            f"pioreactor/{self.unit}/{self.experiment}/dosing_events",
            qos=QOS.EXACTLY_ONCE,
            allow_retained=False,
        )


    def respond_to_dosing_event_from_mqtt(self, message: pt.MQTTMessage) -> None:
        dosing_event = decode(message.payload, type=structs.DosingEvent)
        if dosing_event.event == "remove_waste":
          self.read_pH()

        return

You can also remove all “timer” logic, since you don’t need it - you’ll be responding the remove_waste events. Does that make sense?


I’m happy to help more if my solutions aren’t clear!

Hi Cam,
thanks for the support. Really appreciate it!
Got it working (choosing the first approach).

Our Piorector now has a scale, a mass flow controller, a Do sensor and a pH sensor attached to it.
Neat :slight_smile: