MQTT client service - proof of concept/alpha code

426 views
Skip to first unread message

bell...@gmail.com

unread,
Apr 9, 2019, 4:45:51 PM4/9/19
to weewx-user
To further my WeeWX/MQTT/Python knowledge, I have written a WeeWX multi-threaded service. One thread receives the MQTT messages and puts them on a queue. In the other thread, when a NEW_ARCHIVE_RECORD event is received, it process the queue and updates the archive record with the data.
Currently, processing of the queue means - take the last element and use its data to update the archive record. My next big education will be to develop some type of accumulation of these (essentially software generation).
Also, the MQTT payload is expected to be json. I am hoping that other formats can easily be supported by creating a subclass and overriding the function that maps the MQTT payload to a WeeWX like dictionary.
Being a proof of concept, documentation is lacking, error handling is minimal, limited functionality, no install support, etc. But, I thought even in this form it might be useful/educational.
If interested, it can be found here, https://github.com/bellrichm/weewx-addons/blob/master/user/MQTTSubscribeService.py
- Rich

Ralph Underwood

unread,
Apr 9, 2019, 6:08:37 PM4/9/19
to weewx-user
Bravo! Look forward to testing next month when I should have some free time. 

gjr80

unread,
Apr 11, 2019, 4:45:44 AM4/11/19
to weewx-user
Hi,

Before you start looking into software record generation you might want to consider the software record generation capabilities already built in to WeeWX. WeeWX can handle two broad groups of data, archive data (aka archive records) and loop data (aka loop packets). You might want to read Loop packets vs. archive records in the Customization Guide. In a nutshell the driver can generate an archive record and pass it to WeeWX (this is hardware record generation) or WeeWX can synthesise an archive record from accumulated loop data (this is software record generation). You might want to read about the record_generation config option under [StdArchive] in the User's Guide.

If you are receiving frequent data packets in your queue you could bind to NEW_LOOP_PACKET instead of NEW_ARCHIVE_RECORD and add you data to the loop packet. Provided you are using software record generation WeeWX will then accumulate your loop data (along with all the rest of the loop data) and synthesise an archive record at the end of the archive period. One thing you would need to be careful of is your existing driver, if you select software record generation then it is one in, all in. In other words if you are presently using hardware record generation then when you change to software record generation hardware generated archive records will no longer be produced by the driver only loop packets. Though this should not be a problem in the vast majority of cases.

Gary

bell...@gmail.com

unread,
Apr 12, 2019, 2:17:36 PM4/12/19
to weewx-user
Gary,
Thanks for your insight. My current use case is to augment the data from an existing weather station that may be using hardware generation. So, I can't rely on StdArchive to perform the software generation of the additional data. Even if I could, and bound the processing to a NEW_LOOP_PACKET event, I'd still have the challenge that there could be 0 to n messages received from MQTT between NEW_LOOP_PACKET events.

I have been studying StdArchive and how it performs software generation with the Accum class. I understand the risks of relying on WeeWX internals, but leveraging this class looks to make it straight forward to process the MQTT data into "archive data". Plus, it gives me an excuse to further dive into the details of WeeWX.

- Rich

vince

unread,
Apr 12, 2019, 4:59:51 PM4/12/19
to weewx-user
On Friday, April 12, 2019 at 11:17:36 AM UTC-7, bell...@gmail.com wrote:
Gary,
Thanks for your insight. My current use case is to augment the data from an existing weather station that may be using hardware generation. So, I can't rely on StdArchive to perform the software generation of the additional data. Even if I could, and bound the processing to a NEW_LOOP_PACKET event, I'd still have the challenge that there could be 0 to n messages received from MQTT between NEW_LOOP_PACKET events.



Take a look at the weatherflow-udp driver. It listens for UDP broadcasts that are in JSON format and seeds weewx's database with the pertinent values, mapped in weewx.conf.   It's pretty similar in concept to you subscribing to MQTT messages that might be partial measurements...it just has different types of JSON observations it's grabbing the good stuff out of.

FWIW, the result is exactly what I get from writing my own listener to influxdb and processing it with grafana into a dashboard - so it's definitely doing the right thing re: getting various partial measurements into weewx.  Theoretically if you replaced its 'listen for udp broadcasts' with the mqtt subscription differences, maybe you'd be able to get there.




bell...@gmail.com

unread,
Apr 12, 2019, 7:31:18 PM4/12/19
to weewx-user
Thanks. Since that is a driver, it gets to generate loop packets and then the StdArchive service can software create the archive record. My goal is to write a service that can add “archive data” to an existing WeeWX install/station/driver. So, I need to manage the incoming data and add it to an existing archive record.
I’ve got something up and running. Still very rough around the edges. The more I delve into the details of WeeWX, the more I am impressed with the work Tom and company have done. If only more software was so well architected.
- Rich

bell...@gmail.com

unread,
Apr 23, 2019, 7:36:17 PM4/23/19
to weewx-user
After experimenting and developing more, I decided this needs it own repository. You can now find it here, https://github.com/bellrichm/WeeWX-MQTTSubscribe

The service now supports binding to either new loop packets or new archive records. Since much of the code was similar to my MQTT driver, this has been added to the module. It is still a manual install process, but some documentation has been added.
-rich

Eugen66

unread,
Apr 25, 2019, 10:08:22 AM4/25/19
to weewx-user
Hi, 
really trying to get your MQTT subscription to work, this is a very useful way of adding values to a  great weather software :)
I use (for the moment SDR as driver)
When I try to run it stand alone I get this:

 PYTHONPATH=/usr/share/ python /usr/share/weewx//user/MQTTSubscribe.py --type service --binding archive --interval 300 --delay 15 --records 2 weewx.conf
Traceback (most recent call last):
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 75, in <module>
    import weeutil.weeutil
ImportError: No module named weeutil.weeutil

and  Weewx fail to start when I  add MQTTSubscribe.... to weewx.conf

Any suggestion?

this is the log:
Apr 25 15:50:26 weewx-hyggebu systemd[1]: Starting LSB: weewx weather system...
Apr 25 15:50:27 weewx-hyggebu weewx[9314]: engine: Initializing weewx version 3.9.1
Apr 25 15:50:27 weewx-hyggebu weewx[9314]: engine: Using Python 2.7.13 (default, Sep 26 2018, 18:42:22) #012[GCC 6.3.0 20170516]
Apr 25 15:50:27 weewx-hyggebu weewx[9314]: engine: Platform Linux-4.14.98-v7+-armv7l-with-debian-9.8
Apr 25 15:50:27 weewx-hyggebu weewx[9314]: engine: Locale is 'en_GB.UTF-8'
Apr 25 15:50:27 weewx-hyggebu weewx[9314]: engine: pid file is /var/run/weewx.pid
Apr 25 15:50:27 weewx-hyggebu weewx[9303]: Starting weewx weather system: weewx.
Apr 25 15:50:27 weewx-hyggebu systemd[1]: Started LSB: weewx weather system.
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Using configuration file /etc/weewx/weewx.conf
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Debug is 1
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Initializing engine
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Loading station type SDR (user.sdr)
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: MainThread: driver version is 0.62
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: MainThread: sensor map is {'inTemp': 'temperature.8:1.AcuriteWT450Packet', 'outTemp': 'temperature.1:1.AcuriteWT450Packet', 'outHumidity': '', 'extraTemp1': 'temperature.4:5.HidekiWindPacket', 'extraHumidity1': '', 'extraTemp2': '', 'extraHumidity3': '', 'rain': 'rain_total.4:0.HidekiRainPacket', 'rainBatteryStatus': 'battery.4:0.HidekiRainPacket', 'UV': 'uv_index.1:137.OSUV800Packet', 'outTempBatteryStatus': 'battery.1:137.OSUV800Packet', 'windDir': 'wind_dir.4:5.HidekiWindPacket', 'windGust': 'wind_gust.4:5.HidekiWindPacket', 'windSpeed': 'wind_speed.4:5.HidekiWindPacket', 'windBatteryStatus': 'battery.4:5.HidekiWindPacket'}
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: MainThread: deltas is {'strikes': 'strikes_total', 'rain': 'rain_total'}
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: MainThread: startup process 'rtl_433 -M utc -F json -G'
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: stdout-thread: start async reader for stdout-thread
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: stderr-thread: start async reader for stderr-thread
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Loading service weewx.engine.StdTimeSynch
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Finished loading service weewx.engine.StdTimeSynch
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: engine: Loading service user.MQTTSubscribe.MQTTSubscribeService
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Client id is weewx
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Binding is loop
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Default units is US 1
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Overlap is 0.0
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Host is 10.0.10.165
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Port is 1883
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Keep alive is 70
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Username is xxxxxx
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Password is set
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Topic is Weather/
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Archive topic is None
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Payload type is individual
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Default units is 1
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: MQTTSS: Label map is {'outHumidity': 'outHumidity', 'overlap': '20', 'binding': 'loop'}
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: MainThread: shutdown process rtl_433 -M utc -F json -G
Apr 25 15:50:27 weewx-hyggebu weewx[9318]: sdr: MainThread: waiting for stdout-thread
Apr 25 15:50:31 weewx-hyggebu weewx[9318]: sdr: MainThread: waiting for stderr-thread
Apr 25 15:50:41 weewx-hyggebu weewx[9318]: sdr: MainThread: timed out waiting for stderr-thread
Apr 25 15:50:41 weewx-hyggebu weewx[9318]: sdr: MainThread: close stdout
Apr 25 15:50:41 weewx-hyggebu weewx[9318]: sdr: MainThread: close stderr
Apr 25 15:50:41 weewx-hyggebu weewx[9318]: engine: Caught unrecoverable exception in engine:
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****  cannot convert argument to integer
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****  Traceback (most recent call last):
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/share/weewx/weewx/engine.py", line 884, in main
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      engine = engine_class(config_dict)
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/share/weewx/weewx/engine.py", line 78, in __init__
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      self.loadServices(config_dict)
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/share/weewx/weewx/engine.py", line 142, in loadServices
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      self.service_obj.append(weeutil.weeutil._get_object(svc)(self, config_dict))
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 263, in __init__
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      self.thread = MQTTSubscribeServiceThread(self, self.client, self.queue, self.archive_queue, label_map, unit_system, payload_type, host, keepalive, port, username, password, topic, archive_topic)
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 326, in __init__
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      MQTTSubscribe.__init__(self, client, queue, archive_queue, label_map, unit_system, payload_type, host, keepalive, port, username, password, topic, archive_topic)
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 135, in __init__
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      self.client.connect(host, port, keepalive)
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 839, in connect
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      return self.reconnect()
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 1009, in reconnect
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      return self._send_connect(self._keepalive, self._clean_session)
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****    File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 2361, in _send_connect
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****      keepalive))
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****  error: cannot convert argument to integer
Apr 25 15:50:41 weewx-hyggebu weewx[9318]:     ****  Exiting.

bell...@gmail.com

unread,
Apr 25, 2019, 10:47:02 AM4/25/19
to weewx-user
Hi,
I have only installed via setup.py, but I think the problem with running standalone is that after the PYTHONPATH= it should be /usr/share/weewx
PYTHONPATH=/usr/share/weewx python /usr/share/weewx//user/MQTTSubscribe.py --type service --binding archive --interval 300 --delay 15 --records 2 weewx.conf

As for the real error, it appears that the keepalive value might need to be divisible by 60. I can reproduce the error when I set it to 70, but I am still researching.
- Rich

Eugen66

unread,
Apr 25, 2019, 11:13:58 AM4/25/19
to weewx-user
Thank you so much for helping :)
I have used the apt..

When I run, in folder etc/weewx, I get this :
:/etc/weewx# PYTHONPATH=/usr/share/weewx python /usr/share/weewx//user/MQTTSubscribe.py --type service --binding archive --interval 300  --delay 15 --records 2 weewx.conf
Simulation is service
Creating 2 archive records
Interval is 300 seconds
Delay is 15 seconds
Traceback (most recent call last):
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 557, in <module>
    main()
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 479, in main
    simulate_service(engine, config_dict, binding, record_count, interval, delay, units)
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 523, in simulate_service
    service = MQTTSubscribeService(engine, config_dict)
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 263, in __init__
    self.thread = MQTTSubscribeServiceThread(self, self.client, self.queue, self.archive_queue, label_map, unit_system, payload_type, host, keepalive, por                  t, username, password, topic, archive_topic)
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 326, in __init__
    MQTTSubscribe.__init__(self, client, queue, archive_queue, label_map, unit_system, payload_type, host, keepalive, port, username, password, topic, arc                  hive_topic)
  File "/usr/share/weewx//user/MQTTSubscribe.py", line 135, in __init__
    self.client.connect(host, port, keepalive)
  File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 839, in connect
    return self.reconnect()
  File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 1009, in reconnect
    return self._send_connect(self._keepalive, self._clean_session)
  File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 2361, in _send_connect
    keepalive))
struct.error: cannot convert argument to integer

And trying with 60 

Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Keep alive is 60
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Username is openhabian
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Password is set
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Topic is Weather/
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Archive topic is None
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Payload type is individual
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Default units is 1
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: MQTTSS: Label map is {'outHumidity': 'outHumidity', 'overlap': '20', 'binding': 'loop'}
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: sdr: MainThread: shutdown process rtl_433 -M utc -F json -G
Apr 25 17:11:20 weewx-hyggebu weewx[14390]: sdr: MainThread: waiting for stdout-thread
Apr 25 17:11:24 weewx-hyggebu weewx[14390]: sdr: MainThread: waiting for stderr-thread
Apr 25 17:11:34 weewx-hyggebu weewx[14390]: sdr: MainThread: timed out waiting for stderr-thread
Apr 25 17:11:34 weewx-hyggebu weewx[14390]: sdr: MainThread: close stdout
Apr 25 17:11:34 weewx-hyggebu weewx[14390]: sdr: MainThread: close stderr
Apr 25 17:11:34 weewx-hyggebu weewx[14390]: engine: Caught unrecoverable exception in engine:
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****  cannot convert argument to integer
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****  Traceback (most recent call last):
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/share/weewx/weewx/engine.py", line 884, in main
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      engine = engine_class(config_dict)
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/share/weewx/weewx/engine.py", line 78, in __init__
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      self.loadServices(config_dict)
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/share/weewx/weewx/engine.py", line 142, in loadServices
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      self.service_obj.append(weeutil.weeutil._get_object(svc)(self, config_dict))
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 263, in __init__
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      self.thread = MQTTSubscribeServiceThread(self, self.client, self.queue, self.archive_queue, label_map, unit_system, payload_type, host, keepalive, port, username, password, topic, archive_topic)
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 326, in __init__
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      MQTTSubscribe.__init__(self, client, queue, archive_queue, label_map, unit_system, payload_type, host, keepalive, port, username, password, topic, archive_topic)
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 135, in __init__
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      self.client.connect(host, port, keepalive)
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 839, in connect
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      return self.reconnect()
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 1009, in reconnect
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      return self._send_connect(self._keepalive, self._clean_session)
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****    File "/usr/local/lib/python2.7/dist-packages/paho/mqtt/client.py", line 2361, in _send_connect
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****      keepalive))
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****  error: cannot convert argument to integer
Apr 25 17:11:34 weewx-hyggebu weewx[14390]:     ****  Exiting.

Rich Bell

unread,
Apr 25, 2019, 1:44:08 PM4/25/19
to weewx...@googlegroups.com
--
You received this message because you are subscribed to the Google Groups "weewx-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to weewx-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Rich Bell

unread,
Apr 25, 2019, 1:47:23 PM4/25/19
to weewx...@googlegroups.com
I think i am not handling the options correctly and the keepalive is a string and not an integer. For now, leave it out of weewx.conf and let it default to 60.
- rich

On Thu, Apr 25, 2019 at 11:14 AM Eugen66 <hage...@gmail.com> wrote:
--

wysiwyg

unread,
Apr 25, 2019, 2:19:34 PM4/25/19
to weewx-user
Hi there!
may I ask the differences of this new driver compare to the existing one ? (I guess at least JSON)
Best regards,
To unsubscribe from this group and stop receiving emails from it, send an email to weewx...@googlegroups.com.

Eugen66

unread,
Apr 25, 2019, 2:36:02 PM4/25/19
to weewx-user
Thank you ;) when I left out the keepalive it is running,
I'm testing trying to subscribe to Weather/outHumidity 
 this is the config:
[MQTTSubscribeService] 
    # The MQTT server 
    host = 10.0.10.165

    # The port to connect to
    port = 1883

    # Maximum period in seconds allowed between communications with the broker
    #keepalive =60

    # username for broker authentication
    username = xxxxxxxx

    # password for broker authentication
    password = xxxxxxx

    # Units for MQTT payloads without unit value
    unit_system_name = 'METRIC'  # US or 'METRIC' or 'METRICWX'

    # The clientid to connect with
    clientid = weewx

    # The topic to subscribe to
    topic = Weather/

    # The format of the MQTT payload. Currently support 'individual' or 'json'
    payload_type = individual

    # Mapping to WeeWX names
    [[label_map]]
        outHumidity = outHumidity

    # The amount to overlap the start time when processing the MQTT queue
    # Only used by the service
    overlap = 20

    # The binding, loop or archive
    # Only used by the service
    binding = loop

    # The amount of time to wait when the queue of MQTT payload is empty
    # Only used by the driver
    # wait_before_retry = 2

    # Payload in this topic is processed like an archive record
    # Only used by the driver
    # archive_topic = 

I can't see that it is mapping the outHumidty, this is part of the log:
 


Apr 25 20:28:19 weewx-hyggebu weewx[23083]: sdr: MainThread: packet={'outTemp': 4.562, 'usUnits': 16, 'dateTime': 1556216894}
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Processing interval: 1556216887.000000 1556216894.000000
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Queue size is: 0
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Queue was empty
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Packet after update is: dateTime: 1556216894, outTemp: 4.562, usUnits: 16
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: sdr: MainThread: parse_json: unknown model Acurite Rain Gauge
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: sdr: MainThread: punt unrecognized line '{"time" : "2019-04-25 18:28:14", "model" : "Acurite Rain Gauge", "id" : 52, "rain" : 290.500}#012'
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: sdr: MainThread: ignoring duplicate packet {'outTemp': 4.562, 'usUnits': 16, 'dateTime': 1556216894}
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: sdr: MainThread: packet={'outTemp': 4.562, 'usUnits': 16, 'dateTime': 1556216895}
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Processing interval: 1556216894.000000 1556216895.000000
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Queue size is: 0
Apr 25 20:28:19 weewx-hyggebu weewx[23083]: MQTTSS: Queue was empty

Apr 25 20:33:22 weewx-hyggebu weewx[23083]: MQTTSS: Connected with result code 5
Apr 25 20:33:22 weewx-hyggebu weewx[23083]: MQTTSS: Disconnected with result code 5
Apr 25 20:33:24 weewx-hyggebu weewx[23083]: sdr: MainThread: lines=['{"time" : "2019-04-25 18:33:21", "model" : "HIDEKI Wind sensor", "rc" : 5, "channel" : 4, "battery" : "OK", "temperature_C" : 4.700, "wind_speed_mph" : 0.900, "gust_speed_mph" : 0.000, "wind_approach" : 0, "wind_direction" : 292.500, "mic" : "CRC"}\n', '{"time" : "2019-04-25 18:33:21", "model" : "HIDEKI Wind sensor", "rc" : 5, "channel" : 4, "battery" : "OK", "temperature_C" : 4.700, "wind_speed_mph" : 0.900, "gust_speed_mph" : 0.000, "wind_approach" : 0, "wind_direction" : 292.500, "mic" : "CRC"}\n', '{"time" : "

regards
Atle
To unsubscribe from this group and stop receiving emails from it, send an email to weewx...@googlegroups.com.

Eugen66

unread,
Apr 25, 2019, 3:30:54 PM4/25/19
to weewx-user
Sorrry :( My bad...Found the mqtt disconnected with result code 5... Wrong passwd i conf.
So now it connect and find the data, but Weewx exit's with this:

Apr 25 21:27:51 weewx-hyggebu weewx[30607]: MQTTSS: Processing interval: 1556220434.000000 1556220467.000000
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: MQTTSS: Queue size is: 6
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: MQTTSS: Processing: dateTime: 1556220439.64, extraTemp2: 13.5, usUnits: 1
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: engine: Main loop exiting. Shutting engine down.
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: restx: Shut down MQTT thread.
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: MQTTSS: Disconnected with result code 0
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: sdr: MainThread: shutdown process rtl_433 -M utc -F json -G
Apr 25 21:27:51 weewx-hyggebu weewx[30607]: sdr: MainThread: waiting for stdout-thread
Apr 25 21:27:52 weewx-hyggebu weewx[30607]: sdr: MainThread: waiting for stderr-thread
Apr 25 21:28:02 weewx-hyggebu weewx[30607]: sdr: MainThread: timed out waiting for stderr-thread
Apr 25 21:28:02 weewx-hyggebu weewx[30607]: sdr: MainThread: close stdout
Apr 25 21:28:02 weewx-hyggebu weewx[30607]: sdr: MainThread: close stderr
Apr 25 21:28:02 weewx-hyggebu weewx[30607]: engine: Caught unrecoverable exception in engine:
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****  accum: ScalarStats.addHiLo expected float or int, got 13.5
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****  Traceback (most recent call last):
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/weewx/engine.py", line 890, in main
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      engine.run()
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/weewx/engine.py", line 191, in run
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      self.dispatchEvent(weewx.Event(weewx.NEW_LOOP_PACKET, packet=packet))
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/weewx/engine.py", line 224, in dispatchEvent
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      callback(event)
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 309, in new_loop_packet
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      target_data = self._process_data(start_ts, self.end_ts, event.packet)
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/user/MQTTSubscribe.py", line 289, in _process_data
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      accumulator.addRecord(archive_data)
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/weewx/accum.py", line 256, in addRecord
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      func(self, record, obs_type, add_hilo, weight)
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/weewx/accum.py", line 314, in add_value
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      self[obs_type].addHiLo(val, record['dateTime'])
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****    File "/usr/share/weewx/weewx/accum.py", line 77, in addHiLo
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****      raise ValueError("accum: ScalarStats.addHiLo expected float or int, got %s" % val)
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****  ValueError: accum: ScalarStats.addHiLo expected float or int, got 13.5
Apr 25 21:28:02 weewx-hyggebu weewx[30607]:     ****  Exiting.

rgds
Atle



torsdag 25. april 2019 19.47.23 UTC+2 skrev Rich Bell følgende:
To unsubscribe from this group and stop receiving emails from it, send an email to weewx...@googlegroups.com.

Rich Bell

unread,
Apr 25, 2019, 3:45:54 PM4/25/19
to weewx...@googlegroups.com
I was just going to ask about the password. After the problem with keepalive, I was afraid this bug would show up. My guess is that I need to convert the payload to float. If you want to try patching it, around line 208 change from

data[self.label_map.get(key,key)] = msg.payload

to

data[self.label_map.get(key,key)] = to_float(msg.payload)


Right now I don’t have a great environment for the “individual”  payload type, so it will take a bit to really dig in on this.

Thanks for your help. Rich


To unsubscribe from this group and stop receiving emails from it, send an email to weewx-user+...@googlegroups.com.

bell...@gmail.com

unread,
Apr 25, 2019, 3:56:36 PM4/25/19
to weewx-user
I saw some drivers, but did not find any services that processed MQTT. So, this was developed as a service to augment data from a weather station. I also had my own driver that shared much of the code, so it just made sense to merge the two.
When running as a driver the one difference is that a specific topic can be treated as archive records. I use this to publish loop and archive data from my “production” install and get the exact loop and archive data on other machines, like development, dedicated reporting, etc.
Rich

Eugen66

unread,
Apr 25, 2019, 4:31:02 PM4/25/19
to weewx-user
Thanks :)
That moved it on...
It's  running ;) but seems that I can't change or set the unit to metric :
 # Units for MQTT payloads without unit value
    unit_system_name = METRIC  # US or 'METRIC' or 'METRICWX'
From the log:

Apr 25 22:27:25 weewx-hyggebu weewx[4449]: MQTTSS: Processing: dateTime: 1556224039.79, extraTemp2: 13.4, usUnits: 1
Apr 25 22:27:25 weewx-hyggebu weewx[4449]: MQTTSS: Processing: dateTime: 1556224039.89, outHumidity: 32.6, usUnits: 1
Apr 25 22:27:25 weewx-hyggebu weewx[4449]: MQTTSS: Data prior to conversion is: dateTime: 1556224042, extraTemp2: 13.4, outHumidity: 32.6, usUnits: 1
Apr 25 22:27:25 weewx-hyggebu weewx[4449]: MQTTSS: Data after to conversion is: dateTime: 1556224042, extraTemp2: -10.3333333333, outHumidity: 32.6, usUnits: 16
Apr 25 22:27:25 weewx-hyggebu weewx[4449]: MQTTSS: Record prior to update is: dateTime: 1556224042, outTempBatteryStatus: 0, usUnits: 16, UV: 0.0
Apr 25 22:27:25 weewx-hyggebu weewx[4449]: MQTTSS: Packet after update is: dateTime: 1556224042, extraTemp2: -10.3333333333, outHumidity: 32.6, outTempBatteryStatus: 0, usUnits: 16, UV: 0.0

regards
Atle
Message has been deleted

Eugen66

unread,
Apr 25, 2019, 5:03:19 PM4/25/19
to weewx-user
Thank you so much :)

I have tried all the option for Units for MQTT but it seems that it does not change...

I was wondering, is it possible  for instance to have a "mapping" in the conf file to map values to specific and different topics, inTemp = sensor/inTemp , outTemp= Weather/outTemp as an example?

best regards
Atle

torsdag 25. april 2019 21.45.54 UTC+2 skrev bell...@gmail.com følgende:

Rich Bell

unread,
Apr 25, 2019, 5:45:05 PM4/25/19
to weewx...@googlegroups.com
Looks like a documentation error. Try
unit_system = METRIC
Sorry about that. -rich

To unsubscribe from this group and stop receiving emails from it, send an email to weewx-user+...@googlegroups.com.

Rich Bell

unread,
Apr 25, 2019, 7:42:54 PM4/25/19
to weewx...@googlegroups.com
It’s software, so almost anything is possible :)
Is the use case that the same “field name” might be in different topics? Something like this, topic indoor/temp maps to inTemp and outdoor/temp maps to outTemp.  If so, I’d probably add an option to concatenate the topic into the field name and use the current mapping functionality. This would mean that all input would need a mapping, because all “field names” would be prepended with the higher/parent topic.
But, with that said, my focus is on adding tests to the test suite before adding more functionality.
- rich

On Thu, Apr 25, 2019 at 4:42 PM Eugen66 <hage...@gmail.com> wrote:
I was missing ' at the unit selection...

Thank you so much :)
I was wondering, is it possible  for instance to have a "mapping" in the conf file to map values to specific and different topics, inTemp = sensor/inTemp , outTemp= Weather/outTemp as an example?

best regards
Atle


torsdag 25. april 2019 21.45.54 UTC+2 skrev bell...@gmail.com følgende:

Eugen66

unread,
Apr 26, 2019, 2:42:27 AM4/26/19
to weewx-user
Thank you again, that did it :)
Yes, that's what I meant, because I have different sensors and they do not publish to Weather/, for instance I have a barometer that publish to  SPA/Barometer_SPA, this one I can change but it is used for something else so I'd rather not. But I think this service is great to add different sensor and have the flexibility to subscribe to different topics.
I actually would  like to use the SDR-RTL to publish everything to MQTT and let Weewx subscribe to that, along with other home automation. 
best regards
Atle

Rich Bell

unread,
Apr 26, 2019, 7:55:04 AM4/26/19
to weewx...@googlegroups.com
I was able to recreate. Latest commit, v1.0.2 has this fix.
Still working on correctly handling config options (keepalive bug).
- Rich

bell...@gmail.com

unread,
Apr 26, 2019, 9:55:51 AM4/26/19
to weewx-user
I opened an issue for this feature. It is here
It should be the next feature I work on. But I do need to do some additional code cleanup around bad data, edge cases, etc.
Rich

Eugen66

unread,
Apr 27, 2019, 6:20:58 AM4/27/19
to weewx-user
Thank you ,
I'm sorry but I did not give a good example, I was thinking of something similar to sensor map, so "mapping" directly to topic, like in the SDR driver:
[[sensor_map]]
        inTemp = temperature.8:1.AcuriteWT450Packet
        outTemp = temperature.1:1.AcuriteWT450Packet
        extraTemp1 = temperature.4:5.HidekiWindPacket
        rain_total = rain_total.4:0.HidekiRainPacket
        
example:
[[sensor_map]]
        inTemp = Weather/temperatur1
        outTemp = SPA/Outtemp
        extraTemp1 = sensor/ute/temp
        rain_total = outdoor/rain

best regards
Atle

Rich Bell

unread,
Apr 28, 2019, 8:10:10 PM4/28/19
to weewx...@googlegroups.com
Atle,
Thanks for the additional information. I’d really like to move the conversation to the issue I’m using to track the development, 

And, thanks for taking the leap and trying this. Your help and feedback is greatly appreciated.
-rich
Reply all
Reply to author
Forward
0 new messages