Killing a job started from within an automation also kills the automation

I am trying to develop an automation that executes a blanking phase for Kalman filter initialization and nOD baselining at the beginning of the automation.

Our OD reading interval is around once every 5 minutes, due to the high rate of stirring needed. With this, in the default setup blanking and initialization takes quite some time, which can be problematic, especially for the nOD.

The goal was to start an od_reading job with an increased sampling rate to allow for fast blanking and filter initialization and then after that killing that job and restarting it with the interval value defined in the config.

The problem with this is that if the od_reading job is killed through the job manager, defining job_id or job_name, this “ripples” up and also kills the automation itself. Also when the job is stopped through the UI it seems not to be stopped properly, as restarting it is not possible due to duplication.

Also if the od_reading job crashes this also kills the automation.

All in all it seems like starting the jobs from within a long running automation is not the intended way.

Is there a way to start a job with custom parameters that is not tethered to the automation itself / integrating nicely with the UI / job manager?

Best, Kai

Hi @Jackd4w,

I think there are a few options you can try here¹

  1. There is a configuration parameter samples_for_od_statistics that can changed to set how many samples are needed for the baseline nOD. The default is 35. In your case, keep your interval long, and maybe you want 3-5 samples?
  2. You can programatically change the OD sampling interval during it’s run. From within the job:
    od_reader.record_from_adc_timer.interval = your_new_value
    
  3. Killing jobs will take down the entire Python process, do you probably don’t want to kill jobs. Instead, you can gracefully end a job by using the with syntax. Example:
from pioreactor.background_jobs.od_reading import start_od_reading
from pioreactor.config import config
...

# put this where you need fast od samples
with start_od_reading(
     od_angle_channel1=config.get("od_config.photodiode_channel", "1", fallback=None),
     od_angle_channel2=config.get("od_config.photodiode_channel", "2", fallback=None),
     interval=5) as fast_od:
     # do other stuff, or just sleep

# once we get here, OD reading has gracefully cleaned itself up and you can start OD reading again.

It might help to share you current code, either here or email us at info@pioreactor.com, too.


¹ This sounds like a pretty common request, so I think in a future release, I’ll make this easier by allowing the UI to update interval via the published_settings pipeline.

Hey Cameron,

thanks for the tips :slight_smile: Is the second option you mentioned also taking effect during a running job?

Regarding the context in number 3, I didn’t choose to create the context through a with statement because that to me seems like only doable in a short-term sense. So for blanking it would work but then after the blanking how would the OD reading service be established for the whole duration of the automation without user intervention and without “endangering” the automation process tree due to od_reading job crash / killing?

I had a test run of the automation end because the od_reading job started by the automation encountered an error and shut down which in turn ended the whole process tree.

def _blanking(self):
        if self._blanked:
            return

        # TODO: Add log message for user that blanking is in progress and settings shouldn't be changed

        # In order to start blanking we first clean up all running jobs to create a known environment
        if is_pio_job_running("growth_rate_calculating"):
            with JobManager() as jm:
                jm.kill_jobs(
                    job_name="growth_rate_calculating"
                )
                self.logger.debug("Killed previous growth_rate_calculating job")

        if is_pio_job_running("od_reading"):
            with JobManager() as jm:
                jm.kill_jobs(job_name="od_reading")
                self.logger.debug("Killed previous od_reading job")

        # Now we set up the jobs in a way we want them:
        # Assuming a running stirring job, reducing the rpms. How to deal with dodging?
        # OD reading every 2 seconds (with 35 measurements for growth rate thats 70 seconds)
        # Growth rate with reset cache so no previous data taints our results

        self._set_rpm(500)  # Set rpms to 500

        self.od_reader = start_od_reading(
            config.get("od_config.photodiode_channel", "1", fallback=None),
            config.get("od_config.photodiode_channel", "2", fallback=None),
            2,  # One reading every 2 seconds
            calibration=load_active_calibration("od"),
        )
        self.logger.debug("Started OD job with 2 second interval for blanking.")

        self.growth_rate_calculator = GrowthRateCalculator(
            ignore_cache=True,  # Redo the initial blanks on start of the job
            unit=self.unit,
            experiment=self.experiment,
        )

        # We check if growth rate has reinitialized its baseline, otherwise we wait
        while not hasattr(self.growth_rate_calculator, "ekf"):
            time.sleep(5)

        self.logger.debug("Growth Rate Kalman filter has been initialized")

        # Now that a blank has been established we can go back to jobs as they are defined in the config
        # Interval for OD reading job can not be changed, therefore its restarted.

        self.od_reader.clean_up()

        if self._debug:
            self.logger.debug(
                config.get("od_reading.config", "samples_per_second", fallback=0.2)
            )

        # Not that the blanking is done a od_reader job with the parameters defined in the config needs to run for the remainder of the experiment -> couple of weeks
        self.od_reader = start_od_reading(
            config.get("od_config.photodiode_channel", "1", fallback=None),
            config.get("od_config.photodiode_channel", "2", fallback=None),
            1
            / float(
                config.get("od_reading.config", "samples_per_second", fallback=0.2)
            ),  # Fallback is once per 5 seconds
            calibration=load_active_calibration("od"),
        )

        self.logger.debug("Started config OD reading job")

        # Stirring job can be changed so we take that value from config
        self._set_rpm(config.getfloat("stirring.config", "target_rpm", fallback=400))

        self._blanked = True  # Set the flag so we do not blank again if returning to ready state; i.e. resuming from pause

    def on_disconnected(self) -> None:
        # If we end the dosing automation its important that we clean up everything we started in this context
        if self.od_reader is not None:
            self.od_reader.clean_up()
        if self.growth_rate_calculator is not None:
            self.growth_rate_calculator.clean_up()
        super(PALE_ALE, self).on_disconnected()

This would be the blanking method and the on_disconnect method of the dosing automation I originally wrote and had problems with. I removed the creation of a stirring job from this, as this was causing similar problems as I described for the od_reading job.

A second thing regarding point two: Is there a way to get an already running job as an instance into the automation, via the job manager for example?

Because that code line would only work if the instance of the job was started by the automation, correct? So if the job was started by the user through the UI I don’t see an obvious way to interface with that instance changing the interval for example (other than through published settings and MQTT)

So, I think the first step is getting interval to be an editable published_setting. Then anyone (UI, other jobs/automations, profiles) can edit the interval as they wish.

I’ve coded this up for the next release (coming today or tomorrow). Be sure to be on version 25.1.21 to be able to update.


If you want to view the interval in the UI, try:

  1. nano /var/www/pioreactorui/contrib/jobs/01_od_reading.yaml
  2. Edit/add the following
published_settings:
  - key: interval
    unit: s
    label: OD Interval
    description: Change the OD reading interval.
    type: numeric
    display: true
  1. ctrl-x to save and exit.
  2. Refresh the UI, and here’s a demo:

This only works after the update to the next release (this is my development version).

To change the interval programmatically, you can PATCH to

http://pioreactor.local/api/workers/<pioreactor_unit>/jobs/update/job_name/od_reading/experiments/<experiment>

with body (for example):

   {
     "settings": {
       "interval": 10
     },
   }

In python, this looks like:

from pioreactor.pubsub import patch_into_leader

...

r = patch_into_leader(
   f"/api/workers/{unit}/jobs/update/job_name/od_reading/experiments/{experiment}", 
   json={"settings": {"interval": 5}}
)

So when you automation starts, it can run code like above, the OD reading will change the interval to a “fast reading”, you can get your baseline done, and then you can change the interval again back to “slow reading”.

Would that be enough to solve your problem?

Hey @CamDavidsonPilon

yes that looks like exactly what I need! Thank you so much :smile:

Best,

Kai