patternpythonMinor
Simple meter simulator
Viewed 0 times
metersimplesimulator
Problem
I've created a simple python meter simulator which connects to a serial port (in my case
I'm looking for some advice regarding:
To understand how the flow actually works:
Here are two pictures of:
Please ask me if there's anything unclear and I'll try to provide as many details as possible.
```
import serial
import time
# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()
dict_of_bytes = {
'null_byte': b'',
'new_line_byte': b'\n',
'ack_byte': b'\x06',
'a': b'a',
'etx_byte': b'\x03'
}
dict_of_bytearray = {
'init_command': b'/?35169984!\r\n',
'second_command': b'\x06021\r\n',
'third_command': b'\x06020\r\n'
}
dict_of_responses = {
'init_response': b'/ELS2\\@V8.22 \r\n',
'second_response': b'\x01P0\x02(35169984)\x03m Soll: m',
COM1) listens on the serial port and takes actions based on what it receives.I'm looking for some advice regarding:
- unnecessary / wrong logic (it looks like I've abused those
whileloops)
- code that might be rewritten in a more concise / pythonic way (maybe some methods to avoid the repeating code)
- I'm not so interested in
PEPing this code but any advice regarding this is welcomed.
To understand how the flow actually works:
- from a third party software (of a real meter) I send a command (let's say
read_register)
- I'm monitoring the raw data that is written on
COM1and I can see that the real meter sends abytearraywhich looks like this:?/35169984\r\n. The raw data that is being read for this specificbytearrayis/ELS2\\@V8.22 \r\n. (there might be a longer communication here, but from now on I think it's easy to figure out how the comunication works)
- after the above communication is done, on the real meter I receive a list of indexes. (in my case - I just have a file that contains the exact same indexes as the real meter sends)
Here are two pictures of:
- written data:
- read data
Please ask me if there's anything unclear and I'll try to provide as many details as possible.
```
import serial
import time
# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()
dict_of_bytes = {
'null_byte': b'',
'new_line_byte': b'\n',
'ack_byte': b'\x06',
'a': b'a',
'etx_byte': b'\x03'
}
dict_of_bytearray = {
'init_command': b'/?35169984!\r\n',
'second_command': b'\x06021\r\n',
'third_command': b'\x06020\r\n'
}
dict_of_responses = {
'init_response': b'/ELS2\\@V8.22 \r\n',
'second_response': b'\x01P0\x02(35169984)\x03m Soll: m',
Solution
Disclaimer: None of this review has been tested. I've just applied some manipulation of your original code.
Making thing more simple
Your
Don't repeat yourself
There is a piece of code you have in different places. It might be worth extracting it in a function on its own:
Re-write your function
Now that some part of the logic is extracted in a function, it is much easier to think about it and/or to rewrite it.
In your code, I'd simply write :
```
def get_bytearray(serial, ending_value):
received_bytearray = bytearray()
while True
Making thing more simple
Your
dict_of_ dictionnaries are constant and always used with literal strings. From my point of view, it'd be much clearer to use constants to achieve the same thing:import serial
import time
# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()
# Bytes
NULL_BYTE = b''
NEW_LINE_BYTE = b'\n'
ACK_BYTE = b'\x06'
A = b'a'
ETX_BYTE = b'\x03'
# Commands
INIT_COMMAND = b'/?35169984!\r\n'
SECOND_COMMAND = b'\x06021\r\n'
THIRD_COMMAND = b'\x06020\r\n'
# Responses
INIT_REPONSE = b'/ELS2\\@V8.22 \r\n'
SECOND_REPONSE = b'\x01P0\x02(35169984)\x03m Soll: m'
THIRD_REPONSE = b'\x01P1\x02(00000000)\x03a'
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != NEW_LINE_BYTE:
received_byte = serial_object.read(1) # this reads byte-by-byte what it comes on COM1
received_bytearray += received_byte # create the entire bytearray
if received_byte == NEW_LINE_BYTE:
break
if received_bytearray == INIT_COMMAND:
serial_object.write(INIT_REPONSE)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != NEW_LINE_BYTE:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == NEW_LINE_BYTE:
break
if received_bytearray == SECOND_COMMAND:
time.sleep(0.5)
serial_object.write(SECOND_REPONSE)
time.sleep(0.5)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != A:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == A:
break
if received_bytearray == THIRD_REPONSE:
time.sleep(0.5)
serial_object.write(ACK_BYTE)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != ETX_BYTE:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ETX_BYTE:
received_bytearray += serial_object.read()
break
else:
if received_bytearray == THIRD_COMMAND:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()Don't repeat yourself
There is a piece of code you have in different places. It might be worth extracting it in a function on its own:
def get_bytearray(serial, ending_value):
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != ending_value:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ending_value:
break
return received_bytearray
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
if received_bytearray == INIT_COMMAND:
serial_object.write(INIT_REPONSE)
received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
if received_bytearray == SECOND_COMMAND:
time.sleep(0.5)
serial_object.write(SECOND_REPONSE)
time.sleep(0.5)
received_bytearray = get_bytearray(serial_object, A)
if received_bytearray == THIRD_REPONSE:
time.sleep(0.5)
serial_object.write(ACK_BYTE)
received_bytearray = get_bytearray(serial_object, ETX_BYTE)
else:
if received_bytearray == THIRD_COMMAND:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()Re-write your function
Now that some part of the logic is extracted in a function, it is much easier to think about it and/or to rewrite it.
In your code, I'd simply write :
```
def get_bytearray(serial, ending_value):
received_bytearray = bytearray()
while True
Code Snippets
import serial
import time
# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()
# Bytes
NULL_BYTE = b''
NEW_LINE_BYTE = b'\n'
ACK_BYTE = b'\x06'
A = b'a'
ETX_BYTE = b'\x03'
# Commands
INIT_COMMAND = b'/?35169984!\r\n'
SECOND_COMMAND = b'\x06021\r\n'
THIRD_COMMAND = b'\x06020\r\n'
# Responses
INIT_REPONSE = b'/ELS2\\@V8.22 \r\n'
SECOND_REPONSE = b'\x01P0\x02(35169984)\x03m Soll: m'
THIRD_REPONSE = b'\x01P1\x02(00000000)\x03a'
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != NEW_LINE_BYTE:
received_byte = serial_object.read(1) # this reads byte-by-byte what it comes on COM1
received_bytearray += received_byte # create the entire bytearray
if received_byte == NEW_LINE_BYTE:
break
if received_bytearray == INIT_COMMAND:
serial_object.write(INIT_REPONSE)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != NEW_LINE_BYTE:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == NEW_LINE_BYTE:
break
if received_bytearray == SECOND_COMMAND:
time.sleep(0.5)
serial_object.write(SECOND_REPONSE)
time.sleep(0.5)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != A:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == A:
break
if received_bytearray == THIRD_REPONSE:
time.sleep(0.5)
serial_object.write(ACK_BYTE)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != ETX_BYTE:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ETX_BYTE:
received_bytearray += serial_object.read()
break
else:
if received_bytearray == THIRD_COMMAND:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()def get_bytearray(serial, ending_value):
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != ending_value:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ending_value:
break
return received_bytearray
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
if received_bytearray == INIT_COMMAND:
serial_object.write(INIT_REPONSE)
received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
if received_bytearray == SECOND_COMMAND:
time.sleep(0.5)
serial_object.write(SECOND_REPONSE)
time.sleep(0.5)
received_bytearray = get_bytearray(serial_object, A)
if received_bytearray == THIRD_REPONSE:
time.sleep(0.5)
serial_object.write(ACK_BYTE)
received_bytearray = get_bytearray(serial_object, ETX_BYTE)
else:
if received_bytearray == THIRD_COMMAND:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()def get_bytearray(serial, ending_value):
received_bytearray = bytearray()
while True:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ending_value:
return received_bytearrayContext
StackExchange Code Review Q#131964, answer score: 3
Revisions (0)
No revisions yet.