HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Simple meter simulator

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
metersimplesimulator

Problem

I've created a simple python meter simulator which connects to a serial port (in my case COM1) listens on the serial port and takes actions based on what it receives.

I'm looking for some advice regarding:

  • unnecessary / wrong logic (it looks like I've abused those while loops)



  • code that might be rewritten in a more concise / pythonic way (maybe some methods to avoid the repeating code)



  • I'm not so interested in PEPing this code but any advice regarding this is welcomed.



To understand how the flow actually works:

  • from a third party software (of a real meter) I send a command (let's say read_register)



  • I'm monitoring the raw data that is written on COM1 and I can see that the real meter sends a bytearray which looks like this: ?/35169984\r\n. The raw data that is being read for this specific bytearray is /ELS2\\@V8.22 \r\n. (there might be a longer communication here, but from now on I think it's easy to figure out how the comunication works)



  • after the above communication is done, on the real meter I receive a list of indexes. (in my case - I just have a file that contains the exact same indexes as the real meter sends)



Here are two pictures of:

  • written data:



  • read data



Please ask me if there's anything unclear and I'll try to provide as many details as possible.

```
import serial
import time

# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()

dict_of_bytes = {
'null_byte': b'',
'new_line_byte': b'\n',
'ack_byte': b'\x06',
'a': b'a',
'etx_byte': b'\x03'
}

dict_of_bytearray = {
'init_command': b'/?35169984!\r\n',
'second_command': b'\x06021\r\n',
'third_command': b'\x06020\r\n'
}

dict_of_responses = {
'init_response': b'/ELS2\\@V8.22 \r\n',
'second_response': b'\x01P0\x02(35169984)\x03m Soll: m',

Solution

Disclaimer: None of this review has been tested. I've just applied some manipulation of your original code.

Making thing more simple

Your dict_of_ dictionnaries are constant and always used with literal strings. From my point of view, it'd be much clearer to use constants to achieve the same thing:

import serial
import time

# configure the serial connections
serial_object = serial.Serial(
    port='COM1',
    baudrate=1200,
    parity=serial.PARITY_EVEN,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.SEVENBITS
)
serial_object.close()

# Bytes
NULL_BYTE = b''
NEW_LINE_BYTE = b'\n'
ACK_BYTE = b'\x06'
A = b'a'
ETX_BYTE = b'\x03'

# Commands
INIT_COMMAND = b'/?35169984!\r\n'
SECOND_COMMAND = b'\x06021\r\n'
THIRD_COMMAND = b'\x06020\r\n'

# Responses
INIT_REPONSE = b'/ELS2\\@V8.22         \r\n'
SECOND_REPONSE = b'\x01P0\x02(35169984)\x03m Soll: m'
THIRD_REPONSE = b'\x01P1\x02(00000000)\x03a'

while True:
    serial_object.open()
    print('\nListening on serial...\n\n')
    received_byte = NULL_BYTE
    received_bytearray = bytearray()

    while received_byte != NEW_LINE_BYTE:
        received_byte = serial_object.read(1)  # this reads byte-by-byte what it comes on COM1
        received_bytearray += received_byte # create the entire bytearray

        if received_byte == NEW_LINE_BYTE:
            break

    if received_bytearray == INIT_COMMAND:
        serial_object.write(INIT_REPONSE)
        received_byte = NULL_BYTE
        received_bytearray = bytearray()

        while received_byte != NEW_LINE_BYTE:
            received_byte = serial_object.read(1)
            received_bytearray += received_byte

            if received_byte == NEW_LINE_BYTE:
                break

        if received_bytearray == SECOND_COMMAND:
            time.sleep(0.5)
            serial_object.write(SECOND_REPONSE)
            time.sleep(0.5)
            received_byte = NULL_BYTE
            received_bytearray = bytearray()

            while received_byte != A:
                received_byte = serial_object.read(1)
                received_bytearray += received_byte

                if received_byte == A:
                    break

            if received_bytearray == THIRD_REPONSE:
                time.sleep(0.5)
                serial_object.write(ACK_BYTE)
                received_byte = NULL_BYTE
                received_bytearray = bytearray()

                while received_byte != ETX_BYTE:
                    received_byte = serial_object.read(1)
                    received_bytearray += received_byte

                    if received_byte == ETX_BYTE:
                        received_bytearray += serial_object.read()
                        break

        else:
            if received_bytearray == THIRD_COMMAND:
                time.sleep(0.5)

                with open('regs.txt', 'rb') as fin:
                    bytes_from_file = fin.read()
                    serial_object.write(bytes_from_file)
                    print(bytes_from_file.decode('utf-8'))
                    print('length: ' + str(len(bytes_from_file)))
    serial_object.close()


Don't repeat yourself

There is a piece of code you have in different places. It might be worth extracting it in a function on its own:

def get_bytearray(serial, ending_value):
    received_byte = NULL_BYTE
    received_bytearray = bytearray()
    while received_byte != ending_value:
        received_byte = serial_object.read(1)
        received_bytearray += received_byte
        if received_byte == ending_value:
            break
    return received_bytearray

while True:
    serial_object.open()
    print('\nListening on serial...\n\n')
    received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)

    if received_bytearray == INIT_COMMAND:
        serial_object.write(INIT_REPONSE)
        received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)

        if received_bytearray == SECOND_COMMAND:
            time.sleep(0.5)
            serial_object.write(SECOND_REPONSE)
            time.sleep(0.5)
            received_bytearray = get_bytearray(serial_object, A)

            if received_bytearray == THIRD_REPONSE:
                time.sleep(0.5)
                serial_object.write(ACK_BYTE)
                received_bytearray = get_bytearray(serial_object, ETX_BYTE)
        else:
            if received_bytearray == THIRD_COMMAND:
                time.sleep(0.5)

                with open('regs.txt', 'rb') as fin:
                    bytes_from_file = fin.read()
                    serial_object.write(bytes_from_file)
                    print(bytes_from_file.decode('utf-8'))
                    print('length: ' + str(len(bytes_from_file)))
    serial_object.close()


Re-write your function

Now that some part of the logic is extracted in a function, it is much easier to think about it and/or to rewrite it.

In your code, I'd simply write :

```
def get_bytearray(serial, ending_value):
received_bytearray = bytearray()
while True

Code Snippets

import serial
import time

# configure the serial connections
serial_object = serial.Serial(
    port='COM1',
    baudrate=1200,
    parity=serial.PARITY_EVEN,
    stopbits=serial.STOPBITS_ONE,
    bytesize=serial.SEVENBITS
)
serial_object.close()

# Bytes
NULL_BYTE = b''
NEW_LINE_BYTE = b'\n'
ACK_BYTE = b'\x06'
A = b'a'
ETX_BYTE = b'\x03'

# Commands
INIT_COMMAND = b'/?35169984!\r\n'
SECOND_COMMAND = b'\x06021\r\n'
THIRD_COMMAND = b'\x06020\r\n'

# Responses
INIT_REPONSE = b'/ELS2\\@V8.22         \r\n'
SECOND_REPONSE = b'\x01P0\x02(35169984)\x03m Soll: m'
THIRD_REPONSE = b'\x01P1\x02(00000000)\x03a'

while True:
    serial_object.open()
    print('\nListening on serial...\n\n')
    received_byte = NULL_BYTE
    received_bytearray = bytearray()

    while received_byte != NEW_LINE_BYTE:
        received_byte = serial_object.read(1)  # this reads byte-by-byte what it comes on COM1
        received_bytearray += received_byte # create the entire bytearray

        if received_byte == NEW_LINE_BYTE:
            break

    if received_bytearray == INIT_COMMAND:
        serial_object.write(INIT_REPONSE)
        received_byte = NULL_BYTE
        received_bytearray = bytearray()

        while received_byte != NEW_LINE_BYTE:
            received_byte = serial_object.read(1)
            received_bytearray += received_byte

            if received_byte == NEW_LINE_BYTE:
                break

        if received_bytearray == SECOND_COMMAND:
            time.sleep(0.5)
            serial_object.write(SECOND_REPONSE)
            time.sleep(0.5)
            received_byte = NULL_BYTE
            received_bytearray = bytearray()

            while received_byte != A:
                received_byte = serial_object.read(1)
                received_bytearray += received_byte

                if received_byte == A:
                    break

            if received_bytearray == THIRD_REPONSE:
                time.sleep(0.5)
                serial_object.write(ACK_BYTE)
                received_byte = NULL_BYTE
                received_bytearray = bytearray()

                while received_byte != ETX_BYTE:
                    received_byte = serial_object.read(1)
                    received_bytearray += received_byte

                    if received_byte == ETX_BYTE:
                        received_bytearray += serial_object.read()
                        break

        else:
            if received_bytearray == THIRD_COMMAND:
                time.sleep(0.5)

                with open('regs.txt', 'rb') as fin:
                    bytes_from_file = fin.read()
                    serial_object.write(bytes_from_file)
                    print(bytes_from_file.decode('utf-8'))
                    print('length: ' + str(len(bytes_from_file)))
    serial_object.close()
def get_bytearray(serial, ending_value):
    received_byte = NULL_BYTE
    received_bytearray = bytearray()
    while received_byte != ending_value:
        received_byte = serial_object.read(1)
        received_bytearray += received_byte
        if received_byte == ending_value:
            break
    return received_bytearray

while True:
    serial_object.open()
    print('\nListening on serial...\n\n')
    received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)

    if received_bytearray == INIT_COMMAND:
        serial_object.write(INIT_REPONSE)
        received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)

        if received_bytearray == SECOND_COMMAND:
            time.sleep(0.5)
            serial_object.write(SECOND_REPONSE)
            time.sleep(0.5)
            received_bytearray = get_bytearray(serial_object, A)

            if received_bytearray == THIRD_REPONSE:
                time.sleep(0.5)
                serial_object.write(ACK_BYTE)
                received_bytearray = get_bytearray(serial_object, ETX_BYTE)
        else:
            if received_bytearray == THIRD_COMMAND:
                time.sleep(0.5)

                with open('regs.txt', 'rb') as fin:
                    bytes_from_file = fin.read()
                    serial_object.write(bytes_from_file)
                    print(bytes_from_file.decode('utf-8'))
                    print('length: ' + str(len(bytes_from_file)))
    serial_object.close()
def get_bytearray(serial, ending_value):
    received_bytearray = bytearray()
    while True:
        received_byte = serial_object.read(1)
        received_bytearray += received_byte
        if received_byte == ending_value:
            return received_bytearray

Context

StackExchange Code Review Q#131964, answer score: 3

Revisions (0)

No revisions yet.