weewx-sdr driver modified for rtl-sdr plus extra ttyACM1 input; getting gaps

217 views
Skip to first unread message

William Garber

unread,
Apr 25, 2023, 1:12:15 AM4/25/23
to weewx-user
I have a weewx-sdr driver receiving data over usb from rtl-sdr from an rtl-sdr radio receiver dongle.  The radio receiver gets outdoor data from an Acurite Atlas.  The inside weather data comes from an arduino over usb.  The weewx server is on a raspberry pi.  I modified the weewx-sdr driver AsyncReader (in the beginning of the file) and ProcessManager.  There is now an AsyncReader thread for both the rtl433 ttyACM0 connection receiving data over USB and for the arduino reading data over usb.  These both share the same queue where they save both indoor and outdoor lines of input data.
The driver only accepts data formatted as json and rejects the other lines which are for debugging.  There is a lot of extra discarded data.

It looks like there simply be so much data that the weewx server skips some of it sometimes.  I get some records with nothing but None except for rain.

The archive interval is 5 minutes.  The Atlas and the arduino both generate data every 7 seconds or so.  The arduino was much faster and I got more gaps.  When I slowed the arduino down to the speed of the Atlas there were fewer gaps.

Maybe I should just increase the archive interval?  Or decrease it?
I don't know how to change the Atlas logging interval.

William Garber

unread,
Apr 25, 2023, 1:31:21 AM4/25/23
to weewx-user
jjj.log.gz is journalctl -b -u weewx_atlas.service
typescript is an sql query of the database weewx_atlas.sdb showing a gap at 9:15pm.
could it be garbage collecting caused the gap ?
typescript
jjj.log.gz

William Garber

unread,
Apr 25, 2023, 2:34:22 AM4/25/23
to weewx-user
The modification to AsyncReader is to add a write mode so weewx acts as a  timeserver.  Triggered by sentinel "request_datetime".
The timeserver was not called during the gap at 9:15pm in the previous post attachment and log.  Search the log for "AsyncReader"


    def run_read_mode(self):
        # fixme I think this should be longer than the loop time for the arduino                    
        timeout = 12
        # select(rlist,wlist,xlist,timeout); wait until ready for reading (rlist);                  
        ready,_,_ = select.select([self._fd], [], [], timeout)
        if not ready:
            return
        line = self._fd.readline()
        # fixme verbose                                                                            
        logdbg("read line = '%s'" % (line))
        line = line.rstrip()
        # all our files are binary                                                                  
        line = line.decode()
        if not line:
            return
        if self._write_mode and (line == "request_datetime"):
            self._read_mode = False
            self._write_attempt = 0
            return
        # garberw begin -----------------------                                                    
        # we are not processing any lines other than json so just keep only json;                  
        # this saves a little work until we get to PacketFactory.create;                            
        # to see the debugging info comment this line out;                                          
        # PacketFactory.parse_json handles this line;                                              
        # PacketFactory.parse_text handles the other lines; we never use that;                      
        #if not line.startswith("{"):                                                              
        #    return                                                                                
        # garberw end -----------------------                                                      
        # line = fudge_time()                                                                      
        self._queue.put(line)

    def run_write_mode(self):
        # fixme check this                                                                          
        timeout = 12
        # select(rlist,wlist,xlist,timeout); wait until ready for writing (wlist);                  
        _,ready,_ = select.select([], [self._fd], [], timeout)
        if not ready:
            self._write_attempt += 1
            if self._write_attempt >= self.WRITE_ATTEMPT_MAX:
                logdbg("AsyncReader write timeout")
                self._read_mode = True
            return
        # int(1677178029.12345) seconds since epoch                                                
        time_now_epoch = int(time.time())
        # '1677178029\n'                                                                            
        time_str_epoch = '{}\n'.format(time_now_epoch)
        # b'1677178029\n'                                                                          
        buf = bytes(time_str_epoch, 'ascii')
        self._fd.write(buf)
        self._fd.flush()
        loginf("AsyncReader wrote timestamp={} buf={}".format(time_now_epoch, buf))
        self._read_mode = True

Maybe the problem is that the queue in AsyncReader gets too full before ProcessManager.get_stdout()
yields all the lines to PacketFactory.create() which parses the json in them and
pops each line off the list "lines".  Sort of like flushing the input buffer.
ProcessManager.get_stdout() yields the lines immediately if they contain a timestamp matching
TS = re.compile('^\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d[\s]+')
maybe this would "flush the input buffer sooner" but this doesn't work for me since I am using unix epoch time as an integer
not formatted like TS.

William Garber

unread,
Apr 25, 2023, 11:22:12 AM4/25/23
to weewx-user
jjj.2.edited.a.grep_packet.log shows all the new packet information between 6:45am and 6:55am.
typescript.4.edited shows the database dump of weewx_atlas.sdb at 6:50am when there was a gap (None / NULL values for data).
you can see that weewx should have got a full set of data for that archive interval.  The archive interval is 300 = 5 minutes.
Could it be that there is just too much data?   About half of the data is from the Atlas and half from the indoor sensors.
Weewx can handle the data from the atlas.  The only difference is that the weewx-sdr driver removes duplicate data from the Atlas
but there are no duplicates from the indoor sensors.
typescript.4.edited
jjj.2.edited.a.grep_packet.log

William Garber

unread,
Apr 25, 2023, 8:38:39 PM4/25/23
to weewx-user
Trying to figure out what happens if there are packets from previous archive interval that somehow have not been processed yet and arrive in the current archive interval.
In StdArchive.new_loop_packet it would raise weewx.accum.OutOfSpan; but then it would create a new accumulator assuming it had moved on to the next interval
(not it was still stuck in the last interval).  This could not happen unless the packets arrived out of sequence, with old timestamp after new timestamp which is possible with two data sources like what I have.
You can "cheat" and instead of using an RTC (real time clock) to get the timestamp just have the driver timestamp each packet with the time it received it.  The drift on  my RTC is usually only a couple msec per hour.

William Garber

unread,
Apr 25, 2023, 11:56:13 PM4/25/23
to weewx-user
typescript attachment is database values showing gaps for example at timestamps (unix epoch time):
1682396100
1682409000
1682421300
1682429100
1682430600
1682434200
1682443200
1682460000
1682461800

j01 shows all timestamps extracted from
$ journalctl -b -u weewx_atlas --since 2024-04-25
j02 is the same thing sorted.
using emacs "M-x ediff-files" or just plain "diff" you can see that all of the gaps are within a few milliseconds (about 5 msec) of a packet arriving out of order;  i.e. getting stuck in the wrong interval.
other packets arrive out of order and don't cause gaps but this may be because they are not near the edge of the interval (5 minutes).

So I have to figure out what happens to the packets stuck in the previous interval.
j02.timestamp.sorted.log
j01.timestamp.unsorted.log
typescript.edited.log

gjr80

unread,
Apr 26, 2023, 1:16:18 AM4/26/23
to weewx-user
I'm not sure if you are providing a running commentary or seeking help. If the latter then I have no idea where to start. Personally, I think your architecture is way too complex; you appear to be running a highly modified driver that seeks to amalgamate data from two sources. I imagine you will strike all sorts of corner cases depending on what arrives when. I would also question the utility of reading indoor obs every seven seconds, seems way too frequent to me. All told I doubt you are going to find too many folks here to help.

Why not run a standard sdr driver to feed WeeWX with loop packets from the Atlas with a simple, non-threaded data service bound to the new loop packet arrival to read the 'indoor data' (I assume this is pressure, temperature and humidity) and augment the loop packet. Far more modular and easier to test/develop (and get help), you will be running a standard sdr driver, and since you are already getting your hands dirty modifying the sdr driver, writing a small data service to handle the Arduino input should be a walk in the park. If the Arduino is serially connected to the RPi you should not have great latency in accessing data, so a suitably short timeout on the serial reads should provide you with you indoor data without blocking the main WeeWX thread. Once proved, the serial read could be moved to a thread if you really had the need.

Gary

William Garber

unread,
Apr 26, 2023, 11:24:52 AM4/26/23
to weewx-user
Thank you I expect you are referring to the Customization guide "Customizing the WeeWX service engine; Adding a second data source"; I will try doing it that way; Thanks again;

gjr80

unread,
Apr 26, 2023, 9:12:50 PM4/26/23
to weewx-user
Yes, that section covers it fairly well. The two main things to watch are not delaying the main WeeWX engine loop and ensuring the data you add to the loop packet follows the unit system used in the loop packet. Delaying the main loop is often associated with accessing data via the internet or some other network. A common approach is to develop the data service without using its own thread, this makes debugging much easier. If delay is an issue you can move the service to its own thread later. Unit consistency is achieved by checking the usUnits field in the loop packet and then converting your service' data to the correct units. This is important for obs whose units that vary across unit systems (eg temperature) but not an issue if the same unit is used across all unit systems (eg wind direction). A robust, well written data service will not assume the loop packet always uses the same unit system.

Gary

William Garber

unread,
Apr 27, 2023, 1:08:42 AM4/27/23
to weewx-user
here is my service.  In this version I took your advice and decided to use the weewx-sdr driver completely unmodified and put all my stuff in a service.  Sorry the weewx-sdr driver (the original, not my version) reads its input from AsyncReader (not my modification).  That uses a thread.  That's what I was referring to.  But since I already had my (slightly modified) version of AsyncReader from before I spoke to you, I reused it in the service (see the attachment).  It has a queue.  It stores lines of input from the arduino over usb and discards any which are not json.  This is completely separate from the weewx-sdr and its AsyncReader.  Plus this AsyncReader can also be set up as a writer which enables me to have the option of a simple time server on the raspberry pi to set the RTC on the arduino if needed. 

Anyway I could always get rid of the AsyncReader and just use serial.readline().

Perhaps the example in the weewx customization guide is for a one-shot read of the indoors sensors.  Instead I made it loop over the lines of input in the queue and use an accumulator to average them.

The callback for NEW_ARCHIVE_RECORD for my service would (I think) be called before StdArchive.new_archive_record().  My service only has access to the event and event.record.  Does it also have access to StdArchive.old_accumulator ?
In StdArchive.new_archive_record(event)   it gets the event which was modified by my service so the record includes my data.

So my question is what about old_accumulator in StdArchive.new_archive_record() ?  Is it strictly optional ?  That accumulator only has the outside data not the data added by my new_archive_record() callback.
Of course my data is already averaged. 
In StdArchive.new_archive_record() the event.record has more observation types than the old_accumulator.  Does this pose a problem ?

what do you mean by augmenting?    does this involve choosing self.record_generation = hardware ?
weewx_atlas.py

gjr80

unread,
Apr 27, 2023, 2:24:36 AM4/27/23
to weewx-user
Way more complex than I think is needed. I would see you using the SDR driver as the your driver and nothing else. The example cited in the Customisation Guide does augment archive records rather than loop packets, in your case I would expect you are better served by augmenting loop packets. You don't necessarily need to augment every loop packet (if your loop packets are coming in every 7 seconds from SDR) but you need to make sure you get a good few in each archive period. You should have no need to deal with queues or accumulators in your service, the queue is a hangover of the threaded SDR driver and the WeeWX engine should handle everything to do with accumulators. At then end of the day you want to do two things (1) feed WeeWX with a loop packet stream - the SDR driver does that and (2) augment loop packets with Arduino data - your service does that. If you do these two things WeeWX will take care of generating archive records, reports etc.

For the Arduino, can't you just read the Arduino data direct from the serial port and decode/parse/process it in your service? (there seems to be plenty of examples around the traps on this).

The sort of service I had in mind is something like (untested of course):

import weewx
import weewx.units

class ArduinoService(weewx.engine.StdService):
    """Data service to augment loop packets with data from an Arduino.
   
    We will augment loop packets with obs obtained from the Arduino. The WeeWX
    engine will take care of accumulating this data and generating archive
    records.
    """

    def __init__(self, engine, config_dict):

        # initialisie my parent class
        super(ArduinoService, self).__init__(engine, config_dict)

        # include whatever initialisation you need to speak to the Arduino
        < some python code to initialise any properties etc>
        # we will augment loop packets so bind ourself to any new loop packet
        # events
        self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)

    def new_loop_packet(self, event):
        """Augment a loop packet with data obtained from the Arduino."""

        # obtain a dict of data from the Arduino
        _ard_data_dict = self.get_arduino_data()
       
        # do we have any data
        if _ard_data_dict is not None:
            # convert the Arduino data to the same unit system as used in the
            # loop packet, first obtain a suitable Converter
            converter = weewx.units.StdUnitConverters[event.packet['usUnits']]
            # now convert the Arduino data to the same unit system as used in
            # the loop packet
            _conv_data = converter.convertDict(_ard_data_dict)
            # and finally update the loop packet with our converted data
            event.packet.update(_conv_data)

    def get_arduino_data():
        """Get a data dict from the Arduino.
       
        Obtain any data from the Arduino, parse the data and return as a dict
        using a known unit system. If no data could be obtained return None.
        """

        # some code to obtain the data from the Arduino, this code needs to be
        # carefully constructed to not block
        < python code to read data from the Arduino>

        if <we have Arduino data>:
            # create an empty dict with a pre-populated field usUnits to hold
            # our parsed Arduino data. The usUnits value is arbitrary, but must
            # be one of weewx.US (1), weewx.METRIC (16) or weewx.METRICWX (17).
            # Choose a conventient value, eg if the Arduino provides data in
            # metric use weewx.METRIC (16), if US customary use weewx.US, if a
            # mix of both then pick one and we will need to do some manual
            # conversion for the odd fields. In this case we will use
            # weewx.METRIC
            _data_dict = {'usunits': weewx.METRIC}

            # populate the dict, assuming here the Arduino is providing WeeWX 
            # fields inTemp, inHumidity and barometer (could be 'pressure' not 
            # 'barometer')
            _data_dict['inTemp'] = <some code to provide inside temperature in degree C>
            _data_dict['inHumidity'] = <some code to provide inside humidity in percent>
            _data_dict['barometer'] = <some code to provide barometer in hPa>
            # we have our data dict, now return it
            return _data_dict
        # we could not get any data from the Arduino so return None
        return None

This is fairly bare bones and you might want to put in a few pieces of code to provide log output when debug = 1 (or 2 or 3) but hopefully you get the idea

Gary

William Garber

unread,
Apr 29, 2023, 11:42:56 PM4/29/23
to weewx-user
Just one question please :-).  Suppose the read of the arduino could possibly take a relatively long time, and you want to have a timeout after which it gives up and saves None/NULL for the indoor data. 

What is the max timeout that would be reasonable relative to the archive interval ? What happens if the delay goes past the end of the main archive interval the event you are handling was in?

I think what is happening is I usually get very fast reads from the arduino but once in a while I get a really slow one.       Could be something independent like me reading the database while weewx is trying to write it?

gjr80

unread,
Apr 30, 2023, 7:47:56 PM4/30/23
to weewx-user
On Sunday, 30 April 2023 at 13:42:56 UTC+10 william...@att.net wrote:
Just one question please :-).  Suppose the read of the arduino could possibly take a relatively long time, and you want to have a timeout after which it gives up and saves None/NULL for the indoor data. 

What is the max timeout that would be reasonable relative to the archive interval ?
 
I would be more concerned about the loop packet interval than the archive interval. Your initial post indicated the Atlas emits packets every 7 seconds so the SDR driver will be emitting loop packets every 7 odd seconds. If you have a plain vanilla WeeWX install WeeWX will not be doing much else during the loop packet interval other than calculating derived obs so dwelling for up to, say, a second should have no significant effect. 

What happens if the delay goes past the end of the main archive interval the event you are handling was in?
 
Delaying past the end of the archive interval will not have a significant impact (within reason). Say your service delays 20 seconds past the end of the archive period, when the driver gets it's turn again it will emit another loop packet which will cause an archive record to be generated by WeeWX and ultimately the report cycle is run maybe 20+ seconds later than usual (note the exact behaviour of the driver is very much driver dependent; some drivers may skip loop packets, others may emit a loop packet immediately and yet others may delay emitting a loop packets - the SDR driver is threaded and I believe it is the former). So really you will probably only noticed delayed report output.

Where you will probably get more problems from delaying the WeeWX main loop is in the generation/processing of loop packets. As mentioned above driver behaviour varies from driver to driver. For example, the vantage driver obtains loop packets every 2 odd seconds; if a loop packet is missed it is gone forever. Other drivers poll the hardware much less frequently, say every 50 odd seconds, in that case there could be an entire minute might go by with no data. The consequences of a missed loop packet depends on the system config. A vantage station with a five minute archive interval would see around 120 loop packets per archive period, so the loss of one loop packet will have no real impact. Consider the second system with loop packets arriving every, say, 50 seconds; if it had an archive interval of one minute, you could conceivably see no loop packets in an archive interval and hence no archive record is generated and no report cycle occurs.
 
Remember loop packet data is accumulated by WeeWX and many obs in the resulting archive record are simply the average value of the obs from all loop packets seen during the archive interval. So for slow changing obs, such as air temperature/atmospheric pressure, losing the odd loop packet during an archive interval will have no real impact on the archive record data provided there are numerous other loop packets received in the archive interval. This may not be the case for rapidly changing obs such as wind speed and direction or cumulative obs such as rainfall, in these cases it may be important not to lose loop packets.

One way to deal with sources that have significant latency is to place the code that interacts with the source in it's own thread, this usually entails one or more queues to pass data to the driver/service which adds complexity. The non-threaded approach has simplicity, but risks delaying the WeeWX main loop.

I think what is happening is I usually get very fast reads from the arduino but once in a while I get a really slow one.       Could be something independent like me reading the database while weewx is trying to write it?
 
Possible I guess, if WeeWX was doing some substantial report generation at the time, otherwise not likely. Do the slow response occasions align with WeeWX report generation? What happens if you take WeeWX out of the equation and run a simple python script to poll the Arduino every so many seconds and output the time taken to obtain a response?

Gary

William Garber

unread,
May 1, 2023, 2:05:37 AM5/1/23
to weewx-user
Until now the Atlas emitted NEW_LOOP_PACKET events about every 7 seconds, so I had my indoor weather data arduino (wpa) set to emit at the same interval.  I just sped that up to every 3 seconds (just the wpa emitter).  You can "$ cat /dev/ttyACMwa" which prints the serial data output over usb from the arduino (wpa) directly to the linux console.  That shows (the  new interval) is 3 seconds.  I will run it over a long time to see if there are any long intervals.  But what I suspect is happening is ... I have three weather stations.  I built two with Adafruit parts plus the indoors part of this one (wpa/Atlas).  The "merged" reports are run as a fourth instance of weewx which is not the server, just "wee_reports_merge" which is just "wee_report" with the configuration file for the merged report.  The worst thing is that this was being run every 90 seconds (DUH).  I slowed it down to every 10 minutes.  The archive interval for Atlas/wpa is 5 minutes. 

The hard part was reading the serial port to get the inside data (one line of json data).
(1) flush input buffer
(2) read discard first line in case it is partial
(3) read next line or timeout
the readline() commands are blocking with timeout.  Not sure how to do non blocking but see this discussion:
they all seem to think that at least part of the procedure is blocking (not all !)

it works so far.  will leave it to run overnight        service attached.     service processes  inside data in NEW_LOOP_PACKET as you described.  weewx uses service plus original unmodified weewx-sdr.py driver for outside data.
weewx_atlas.py

William Garber

unread,
May 2, 2023, 12:59:36 AM5/2/23
to weewx-user

here is another version that blocks less.  
weewx_atlas.3.py

William Garber

unread,
May 3, 2023, 5:26:21 PM5/3/23
to weewx-user
here is the hopefully final version.  seems to  have finally fixed gaps.  what happened with the version in the previous post was ...
The NEW_LOOP_PACKET callback in the service I wrote waits too long to get the next line of inside (temp press humid) data.  Meanwhile the outside data (from weewx-sdr from the Atlas) gets backed up.  You can see this by setting "log_lines=1" which is an option specific to weewx-sdr.  Because there was a lot of data including duplicates from the Atlas and because the arduino was too slow to keep up with it I emulated the AsyncReader thread from weewx-sdr.  My version of AsyncReader keeps only the last valid json line.  It can return that line instantly when processing NEW_LOOP_PACKET.  That means none of the Atlas lines back up and none of them get skipped.
So it is completely non blocking and this should be what GJR described as close as I could do.

If you want the arduino code, just make anything that prints a json version of the indoors data such as
{"inTemp": 60.0, "inHumidity": 40, etc. }
these must be the only lines from the arduino beginning with "{".  Everything else is skipped so the arduino can produce as much print statement debugging as you want.
weewx_atlas.4.py
Reply all
Reply to author
Forward
0 new messages