Data logging with pymodbus

505 views
Skip to first unread message

Srinath Chakravarthy

unread,
Mar 25, 2021, 4:03:03 PM3/25/21
to pymodbus
Hi, I have a temperature controller that i have purchased and would like to monitor and log data from this controller using pymodbus. 

I have been sucessful in connecting to the controller through python code and the pymodbus repl. I can also read and write to coils and registers. 

What i would like to do is to have a running loop which does monitoring of the controller, while allowing for user to change data. 

I have tried all versions of the async clients and am unable to get any of them to work. 

Any help will be much appreciated. 

Cheers
Srinath 

"""
Solo class for reading and writing data from Solo temperature controller
Based on pymodbus that implements Modbus Protocol
All Modbus registers are in the attached document and are hard-coded into this document
"""
__author__ = "Srinath Chakravarthy"
__copyright__ = "2021"

from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# from pymodbus.constants import Endian
# from pymodbus.payload import BinaryPayloadDecoder
import threading
import math


# import time
# import numpy as np
# import json


class Solo(threading.Thread, ModbusClient):
"""
Defines the instantiation of Temperature Controller using serial port

Attributes
----------
port : serial Port location (default : /dev/ttyUSB0)
serial port of controller
"""

def __init__(self, method="ascii", unit=1, **kwargs):
threading.Thread.__init__(self)
# --- Sets the
self.unit = unit
self.connected = False
# --- Defaults for ascii transport
self.client = ModbusClient.__init__(self, method, port='/dev/ttyUSB0', parity="E", bytesize=7, stopbits=1,
timeout=1, baudrate=9600)
try:
self.connected = self.connect()
except:
raise ConnectionError('Cannot Connect to furnace')
self.input_types = {"Analog Voltage 0-50 mV": 17,
"Analog Current 4 - 20 mA": 16,
"Thermocouple TXK": 10,
"Thermocouple U": 9,
"Thermocouple L": 8,
"Thermocouple B": 7,
"Thermocouple S": 6,
"Thermocouple R": 5,
"Thermocouple N": 4,
"Thermocouple E": 3,
"Thermocouple T": 2,
"Thermocouple J": 1,
"Thermocouple K": 0,
}

def set_input_type(self, input_type):
"""
Set the input type
Stored in a dictionary based on
Done by setting register hex: 1004 -> decimal-> 4100 actual -> 441001 to values from 0 -17
1) 0 - 50 mV Analog voltage input -> register value -> 17
2) 4 - 20 mA Analog Current input -> register value -> 16
3) 0 -20 mA Current input -> register value -> 15
4) .... Thermocouple types ... see self.input_types for enumeration of input_types
"""
if not self.input_types.has_key(input_type):
raise KeyError("The controller cannot support requested input type")

res = self.client.write_register(address=4100, value=self.input_types[input_type], unit=self.unit)

def read_process_value(self):
"""
Read the register for Process value 0x1000 : 44097 : 4096
Process value is usually the temperature if a thermocouple is connected to the controller
but it could be anything depending on what the input is

This is a standard logic for most modbus controllers :
The process value actually depends on P2-3 parameter in the manual
This sets the decimal precision of the outputs
2 values are possible 0 and 1 for the decimal
0 -> no decimal is used -> example : 295 -> 29
1 -> 10th decimal is used -> example : 295 -> 29.5
Depending on what we wish to do we can set the precision ...

TODO : Correctly capture error terms
:return: res

"""
stat = self.client.read_coils(address=2064, cound=8, unit=self.unit)
pv = self.client.read_holding_registers(address=4096, count=1, unit=self.unit)
if stat.getBit(3):
# no decimal
res = float(math.floor(pv[0]/10.0))
else:
# Single decimal
res = float(pv[0]/10.0)


def read_output_levels(self):
"""
Read the output levels of the controller.
First check if output is enabled from status and then get the output for the controller
--- These are controlled by the bits 2 and 3 of the coils starting from 2048
Registers are
1) hex 1012 -> Decimal 4114 -> actual 44115
1) hex 1013 -> Decimal 4115 -> actual 44116
:return:
"""
res = self.client.read_holding_registers(address=4114, count=2, unit=self.unit)
return res.registers

def stop(self):
"""
Stop all the runs ... PID/Ramp_Soak
Reads bit 0 of coil 2064 to see if Ramp_Soak control is enabled
if Ramp_soak is not enabled, then Regular control bit 5 is set to false
else ramp_soak control bit is 6 is set to false
:return: nothing
"""
stat = self.client.read_coils(address=2063, count=8, unit=self.unit)
if stat.getBit(0):
# Ramp_soak control is running
res = self.client.write_coil(address=2069, value=0, unit=self.unit)
else:
# PID/Manual control is running
res = self.client.write_coil(address=2068, value=0, unit=self.unit)


if __name__ == "__main__":
controller = Solo()
print(controller.connected)
# result = controller.read_coils(address=2048, count=8, unit=controller.unit)

result = controller.read_holding_registers(address=4119, count=1, unit=controller.unit)
print(result.registers)
Reply all
Reply to author
Forward
0 new messages