Help with basic MQTT example

I’ve been reading through the documentation and your other post about MQTT, but I think at this point I might just need to see some basic examples/sketches of how to implement it, and then I can start experimenting on my own.

Ex: I’m running pioreactor1 on a Leader+Worker image. I just manually started the default chemostat dosing automation. I want to create a basic script at ~/my_script.py (hosted on pioreactor1), and when I run this script, it will: (1) get the current volume or duration from the default chemostat; and (2) update the current published settings so volume = 1 and duration = 2.

I am not sure how to do the first, but I believe I can do the second as shown below.

from pioreactor.background_jobs.base import BackgroundJobContrib
from pioreactor.whoami import get_unit_name, get_latest_experiment

class Brains(BackgroundJobContrib):

   def __init__(self):
   
   def set_value(self, attribute, value):
      self.publish(f"pioreactor/pioreactor1/{get_latest_experiment()}/dosing_automation/{attribute}/set", value)

   def get_value(self, attribute) -> int | float:
      # Retrieve value of 'attribute' from pioreactor1 here.
      return value

b = Brains()

b.get_value("volume") # returns value of pioreactor1 published attribute volume
b.set_value("volume", 1.5) # Set's published attribute "volume" to 1.5
b.block_until_disconnected()

I am wondering how I might accomplish this by directly asking for the value of a published attribute. Is this currently possible?

I understand that the leader acts as the MQTT broker, and I am picturing this as a message board (hosted by the leader) from which all pioreactors can post messages to and read messages from. I’m not sure if this message board also stores any other data, such as the most recent values of published settings or a list of all pioreactors that are currently connected to the MQTT broker.

Edit: I ran pio mqtt -t "pioreactor/+/+/+/#" and this looks relevant.

pio mqtt -t "pioreactor/+/+/+/volume/#"

2023-03-17T12:27:35 | pioreactor/pioreactor1/experiment/dosing_automation/volume 0.0
2023-03-17T12:27:35 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$settable True
2023-03-17T12:27:35 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$datatype float
2023-03-17T12:27:35 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$unit mL

Here I can see that “pioreactor/pioreactor1/experiment/dosing_automation/duration 20.0” shows that the value of duration is 20. However, I found this information by running the “pio mqtt -t *” command from terminal. I’m not sure how I would do the same thing from a script, and save the value ‘20’ to a variable.

1 Like

Yea! This done with the pioreactor.pubsub.subscribe function:

from pioreactor.pubsub import subscribe
...

   def get_value(self, attribute) -> str:
      msg_or_None = subscribe(f"pioreactor/pioreactor1/{get_latest_experiment()}/dosing_automation/{attribute}", timeout=1)
      if msg_or_None is not None:
         value = msg.payload.decode() # this is now a string, you may need to call float(...) etc to convert it. 
      else:
         value = None
      return value

The subscribe function blocks, so we want to add a timeout so that it doesn’t wait forever if we do something silly like: .get_value("rgewgrw")

I understand that the leader acts as the MQTT broker, and I am picturing this as a message board (hosted by the leader) from which all pioreactors can post messages to and read messages from.

This is a pretty good mental image. It’s live though, so pioreactors can’t read past messages, only ones available since starting to listen (Note: the retain flag on a message changes this to always keep around the most recent message published. All published_settings attributes use retain=True).

a list of all pioreactors that are currently connected to the MQTT broker.

For this, you probably want to use the function pioreactor.config.get_active_workers_in_inventory - that should be equal to who’s connected. It’s derived from the inventory section in the config.ini.

Cool, I think this is starting to click for me. I hadn’t realized it can’t read past messages, but that the most recent message is the only message retained. I have a few remaining questions if you don’t mind.

  • Question 1

value = msg.payload.decode() # this is now a string, you may need to call float(…) etc to convert it.

I need to import json for this, correct? I saw this in the last thread and had never seen the .msg function (object? class?) before, so I assumed it came from importing json. (just double checking my understanding of what is going on)

  • Question 2
    I type this into Terminal:

pioreactor@pioreactor1:~ $ pio mqtt -t “pioreactor/+/+/+/volume/#”
2023-03-17T12:44:37 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$settable True
2023-03-17T12:44:37 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$datatype float
2023-03-17T12:44:37 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$unit mL
2023-03-17T12:44:37 | pioreactor/pioreactor1/experiment/dosing_automation/volume 0.0

pioreactor@pioreactor1:~ $ pio mqtt -t “pioreactor/+/+/+/volume”
2023-03-17T13:00:58 | pioreactor/pioreactor1/experiment/dosing_automation/volume 0.0

pioreactor@pioreactor1:~ $ pio mqtt -t “pioreactor/+/+/+/volume/$settable”
[nothing]
pioreactor@pioreactor1:~ $ pio mqtt -t “pioreactor/+/+/+/volume/settable”
[nothing]

I can see attributes (and their values) like “$settable” when I run pio mqtt -t "pioreactor/+/+/+/volume/#", and I can (only) see the value of “volume” when I run pio mqtt -t "pioreactor/+/+/+/volume". Intuitively, I think I should be able to see the value of attribute $settable by running pio mqtt -t "pioreactor/+/+/+/volume/$settable", but this doesn’t seem to be working. I see that $ is an internal statistic of the MQTT broker, but I don’t understand why pio mqtt -t "pioreactor/+/+/+/volume/#" will display all the $internal_statistics, but pio mqtt -t "pioreactor/+/+/+/volume/$settable" won’t display this:

2023-03-17T12:44:37 | pioreactor/pioreactor1/experiment/dosing_automation/volume/$settable True

I’m curious about why it is behaving this way. Is this just a syntax error on my end, or is there something else going on?

  • Question 3

(Pretend this is all strikethrough text)

pioreactor.config.get_active_workers_in_inventory - that should be equal to who’s connected. It’s derived from the inventory section in the config.ini.

This returns a tuple, but I wouldn’t know how to figure that out without access to the source code. In my (very limited) experience with IDEs, they tend to have a “console” or “terminal” window where you can enter code directly and have it run. Is there an equivalent on Raspberry Pi’s/VIM?

For example, with a “Python console”, I could enter the following commands:
# import whatever is needed
variable = pioreactor.config.get_active_workers_in_inventory
print(variable)
print(variable.type())

Currently, my only method of running Python code is by saving it in a script and then running that script. Is there another way of entering code to test it out?

After a bit of googling, I think I can just type python3 into Terminal without any arguments. Also, is there a way to strikethrough multiple paragraphs?

  • Question 4

From the default config.

[ui]
# the UI will be available at the below alias, along with <leader_hostname>.local
# note - it must end in .local, and subdomains are not allowed.
# requires a reboot after changing.
domain_alias=pioreactor.local

If I have two leaders, how is a conflict between the two handled if they both have pioreactor.local as a domain alias (e.g., first come, first served)? Can I just ignore any potential conflicts and access each leader from their specific hostname?

Question 1.

You may need to import json and use json.loads. It depends on if the payload string is a json string or not. Ex: volume for a dosing automation is just a string (a float, actually), so you don’t need json. But some attributes you do need json: something like json.loads(msg.payload.decode()) to turn it into a dictionary.

Question 2.

Oh this is subtle and has bit me before. It’s actually your terminal that is messing up the $. Use single quotes instead of double quotes:

pio mqtt -t 'pioreactor/+/+/+/volume/$settable'

Question 3.

I do this all the time: on the command line on a Pi, enter python3, and that’s the Python console (called a REPL):

Question 4.

It’s whatever is machine broadcasts their alias last, typically this happens at startup, so whichever is the most recently booted. This isn’t always true though. I would suggest changing the domain_alias for one of them to avoid conflicts.

Can I just ignore any potential conflicts and access each leader from their specific hostname?

Yup, this should always work.

I can see that the growth_rate and od_filtered topics have a dictionary as a payload and I would like to create new topics that also have a dictionary as payload.

2023-03-20T08:56:49 | pioreactor/pioreactor1/experiment/growth_rate_calculating/growth_rate {“growth_rate”:0.015543124675650534,“timestamp”:“2023-03-11T22:58:06.796459Z”}
2023-03-20T08:56:49 | pioreactor/pioreactor1/experiment/growth_rate_calculating/od_filtered {“od_filtered”:2.6523560051435884,“timestamp”:“2023-03-11T22:58:06.796459Z”}

Previously, I have been publishing payloads to the MQTT like below:
self.publish(f"pioreactor/{self.unit}/{get_latest_experiment_name()}/{job}/{attribute}/set", value)

Can I instead just do:

value = {"example1": 1, "example2": 2}
self.publish(f"pioreactor/{self.unit}/{get_latest_experiment_name()}/{job}/{attribute}/set", value)

and would this successfully publish the dictionary to that topic? My concern is how this might conflict with the currently existing published_settings.

  • What happens if I have published_settings={ "attribute": {"datatype": "float", "settable": True, "unit": "mL"}} and then try and publish a string to attribute, or a dictionary to attribute?

My intuition is that this might not be an issue because it seems like all the MQTT topics/values get converted to strings.

  • How are these published settings tied to the object attributes? I looked here, but want to explicitly clarify its behavior.

For example, if I have a class attribute (e.g., self.attribute = 0), and attribute is a published_setting, then if I enter self.attribute = 10, this change will be reflected in the MQTT topic, correct? And in the reverse situation, where I instead used self.publish(".../attribute/set", 20), would this change self.attribute to now hold a value of 20?

After writing all this out, I realized that I could probably figure out how to test this on my own via REPL. I’ll try and do that first in the future. :sweat_smile:

Yup, that should work. Internally, we convert to dicts / list / etc. to strings.

then try and publish a string to attribute, or a dictionary to attribute?

My advice is three-fold:
i) avoid, if you can, publishing to an existing topic unless your schema (dict/json format) looks the same. Feel free to create new topics though!
ii) generally, avoid mixing “types” in a topic. So if you publish json/dicts, only publish json/dicts.
iii) The datatype field should correspond to what you’re publishing because internally we parse the incoming data based on the value of datatype. Here’s a list of types you can use: pioreactor/pioreactor/background_jobs/base.py at master · Pioreactor/pioreactor · GitHub

It’s some Python magic. When you do something like chemostat.volume = x or self.volume = y, Python calls an internal __setattr__ function. We override this: pioreactor/pioreactor/background_jobs/base.py at master · Pioreactor/pioreactor · GitHub

And in the reverse situation, where I instead used self.publish(".../attribute/set", 20) , would this change self.attribute to now hold a value of 20?

Yea, this is done here, specifically here. The _set_attr_from_message is a callback that is fired whenever a message is published to f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/+/set",

I’m making progress on it.

I curious if some of my code is redundant though.

MyClass(BackgroundJobContrib):
    published_settings = { "new_dict": {"datatype": "json", "settable": True, "unit": "N/A"}
    self.subscribe_and_callback(
            self.update_attribute_function,
            f"pioreactor/{self.unit}/{get_latest_experiment_name()}/{self.job_name}/new_dict")
   def set_new_dict(self, var_dict):
      # takes in dict, does some basic commands
   def update_attribute_function(self, message):
       # This was intended to be performing the actual update whenever the attribute in published_settings changed over MQTT
       # This function was supposed to take the message and send it to self.set_new_dict(message)
       decoded_dict = message.payload.decode()
       self.set_new_dict(decoded_dict)

I think I might be calling set_new_dict(…) twice with my code because I just found somewhere in the docs that said set_ is automatically called any time in published_settings has something posted to /set.

I’ve been using my method self.set_new_dict(self, var_dict: dict) as a method of setting this variable, but now I’m worried about what parameters are going to be passed to self.set_new_dict(…) if MQTT messages are automatically getting sent there.

Do I have to worry about whether, when the MQTT topic is updated (via publish /set), the parameter is going to be a json or dict? I know I can convert it via: var_dict = var_dict.payload.decode() if it is an MQTT object passed to it, but I’d like to know more about the underlying behavior.

I think I might be calling set_new_dict(…) twice with my code because I just found somewhere in the docs that said set_ is automatically called any time in published_settings has something posted to /set.

Yea, this code is doing extra work, and would call set_new_dict twice. Here’s a simpler refactor:

MyClass(BackgroundJobContrib):
    published_settings = { "new_dict": {"datatype": "json", "settable": True}}

   def set_new_dict(self, data: dict):
      # takes in dict, does some basic commands

When a new message is published to pioreactor/.../new_dict/set, MyClass will act in the following way:

  1. It receives via an internal MQTT subscription, and since the datatype is "json", it will convert it to a Python dictionary (that’s this code). It does this by passing the entire payload to json.loads.
  2. It then checks if a function set_new_dict exists on the class, and if it does exist, it will call it and pass in the dictionary. set_new_dict does exist, so it’s passed in.