Advice for structuring data for MQTT and InfluxDB

I started about a year ago with a Raspberry Pi collecting some temperature readings, sending to a function node to define it as a field for Influx, then reading / querying Influx using Grafana.

Today I have about a dozen Pi's and several units from Opto22 that run Node-RED, and am collecting modbus data (both RTU via RS485 and TCP) as well as various other sensor data besides temperature (feedrate, rotational speed, etc.). The whole thing has become a huge mess (although everything works). I never structured my Influx time series data to use tags, and even though a certain piece of equipment may have 5 sensors (temperature, pressure, etc.), I am sending it to Influx as separate measurements with 5 different timestamps, which should not be necessary in my case. And oh yeah, along the way I discovered MQTT and see huge advantages in using that (in addition to Influx).

So I have decided to take a clean sheet approach and start over fresh. I watched the Influx schema design webinar and brought myself up to speed on Influx 2.0 and some of the Flux langauge, and organized (on paper) the various tags and fields that make sense. Here is a summary of the tags:

Tags Examples
EquipType furnace, prewash, tank
EquipNumber 314, 6
EquipElement zone1, zone2, elevator, pump, feeder
MeasType actual, setpoint

and here is a summary of the fields:

Fields Type
temperature integer
feedrate integer
rotational_speed float
position boolean
pressure float
level integer
humidity float

Here is one way I envision this whole thing coming together:

In the above, the top block of nodes represents the data collected by the Opto22 device on a given piece of equipment (e.g. Furnace 6). Each sensor is collecting different things such as temperature, feedrate, elevator position, pump rotational speed, etc.). Instead of sending each sensor reading to Influx with it's own timestamp, my plan is to consolidate (via the Join node?) and then feed the tag and field measurement data to the MQTT node (is JSON format the right approach?). I would definitely have to manipulate the data using function nodes and the buffer-parser nodes (as I presently do). The Raspberry pi, which may be on the same machine but, say, at the other end with it's own ethernet connection, may be collecting temperature data from a different section (let's say zone 2 temperature). Again, I would have to manipulate the data before sending to MQTT and Influx.

Finally, my thought was to have a separate PC (probably the same one running Grafana) that hosts the Influx DB.

I am not experienced with MQTT, but I believe the payload & topic would correspond to the field & tag, respectively. I believe that using JSON format would allow me to send all the readings and all the tags in one expression. For example, all of the sensor readings below would be sent to Influx with the same timestamp. The Influx tag descriptions below (x/y/z) should also be mirrored in the MQTT topics, correct?

  • 627 = furnace / 6 / zone1 / actual (this means the temperature controller on furnace 6 in zone 1 is reading 627 C)
  • 625 = furnace / 6 / zone1 / setpoint (this means the temperature controller's setpoint for zone 1 is 625 C)
  • 97.8 = furnace / 6 / pump / actual (this means the pump on furnace 6 is reading 97.8 Hz)
  • 100.0 = furnace / 6 / pump / setpoint (this means the pump on furnace 6 has a setpoint of 100.0 Hz)
  • TRUE = furnace / 6 / elevator (this means the elevator status on furnace 6 is UP)

You haven't talked about what Measurements you are planning in the database. Do you intend that all those tags and fields are all in one Measurement?
To answer how the Measurements are to be organised we need to know what fields are valid for what pieces of equipment. For example, does a tank have feedrate, rotational speed and so on. You don't want to have all those seven fields in one measurement if normally only half of them are present. That leaves lots of nulls cluttering up the database.
Often it is a good idea to start with the sensors (as does the exercise in the webinar) and assume that each sensor type is in its own measurement (with whatever tags are necessary). Then work out sensible combinations of those that are always measured at the same time and put those together. For example your tanks might have temperature and pressure and you measure them at the same instants, so put them into one measurement.

I will indeed do a mock-up of which fields are applicable (now or down the road), which will be supplied, and at what frequency for each piece of equipment. If I did have 7 fields and all of them were present (no nulls), does that pose a problem for Influx or Grafana? I do not think it would.

Just one general question re: the Join node. If I had, for example, temperature being recorded every 1 minute and pressure being recorded every 1 minute on a tank, but from two different Node-RED devices (e.g. an Opto22 device and a RPi), then I believe it would make sense to use a Join node and send them into Influx as a single measurement with a single timestamp. The potential problem I see is if the thermocouple got temporarily disconnected, then the payload (temperature value) would never arrive to the Join node, and there would be no entry (for neither temperature nor pressure) in Influx. From my experience, this is the only benefit of my current spaghetti mess where each sensor is sending data to Influx independently of the others. The downside is that the database has far too many entries and no use of tags, poor querying, etc. Maybe there is a technique where a Function node does the consolidation but does not hold up the measurement entry into Influx if one sensor is not working.

That would not be a problem, no. The nulls don't cause a problem for influx, it is just inefficient, wasting disc space, increasing query times and so on. Similarly mixing a value measured every ten seconds with one measured every minute and adding repeated values for the slow one just to fill in the nulls would also be inefficient.

To avoid that you can do things like using a Trigger that is started when one of them arrives and after, say 0.75 seconds, it sends a message which is used in the Join to release the current value. Configure the Join so that it releases on that message, and also send the released message back to the trigger and use it to reset the trigger. The result is that if only one of the values arrives then it will be released by the Trigger node, but if both arrive then they are released together and the trigger reset. However, I really wouldn't bother. Send them to separate measurements but make sure you are using tags appropriately.

Are you talking about hundreds of pieces of plant?

OK, thanks for the explanations. No, not hundreds of pieces of equipment in the plant, but eventually 15 to 20, and each with 5 to 15 sensors monitoring various parameters. The data is coming from 4 sources: Modbus 485 devices, Modbus RTU devices, Opto22 devices, and Raspberry Pi's.

I plan to organize my Modbus data using the buffer-parser (that you outlined here) and assign tags before sending to Influx, and for the Opto22 and RPi's, send it to Influx using tags. So on the same piece of equipment that has both Modbus and non-Modbus data, I will send separate measurements with meaningful tags, and few (if any) null values. I will use the Join/Trigger nodes only in those instances where it absolutely makes sense.

There are many posts on MQTT & Influx here on the forum that I should be able to get the data organized, but am I correct to assume that MQTT payload & topic would correspond to the Influx field & tag, respectively?

MQTT and Influx are not directly related at all.

Do you mean that you want to pass the data via MQTT then use the topic/payload to automatically determine what to write the data to influx?
If so then what particular benefit do you see in passing it via MQTT, or are you trying to centralise the data using MQTT before writing it to influx?

Yes, passing the sensor data via MQTT was generally intended to aggregate / centralize the data before writing to Influx.

But a future benefit of using MQTT was that I could set up Grafana streaming (setting up a MQTT datasource is currently a DIY-type thing, but I am told that MQTT datasource will be ready for the masses by year end).

PS: Forgot to mention that InfluxDB is cloud-hosted, so when Internet goes out, Grafana (running on a local PC) can't see anything. Having MQTT on the LAN would allow Grafana to at least see the live data (not the historical data) when the Internet connection goes down.

If you want to have MQTT topics mapping to influx data then you will need to encode the database (unless it is all in one database), the measurement, the retention policy (which you haven't mentioned, but if you are reworking the schema the definitely think about retention policy before proceeding, otherwise you will find yourself starting again again at some point), the tags and the fields.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.