HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Reading analog moisture signal from Arduino over USB to RPi3 and publishing via paho MQTT

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
arduinoreadingsignalrpi3moisturemqttpahousbpublishingvia

Problem

This program is meant to read a moisture sensor attached to an Arduino UNO board via USB to a Raspberry Pi3 which broadcasts it over MQTT.

I am looking for any constructive criticism.

``
import serial
import time
import RPi.GPIO as GPIO
import paho.mqtt.client as mqtt

def main():
try:
while 1:
sendMQTT(read_sensor("clean"),"Planty/moisture_readings")
alerts()
except KeyboardInterrupt:
print "\n\nExiting via Keyboard Interrupt"

def debugger():
# This is just to test each reading
moisture_clean = read_sensor("clean")
output = "Soil Moisture is " + str(moisture_clean) + "%"
moisture_raw_adjusted = read_sensor("raw_adjusted")
output += "\nRaw_adjusted is " + str(moisture_raw_adjusted)
moisture_raw = read_sensor("raw")
output += "\nRaw is " + str(moisture_raw)
sendMQTT(output, "Planty/debug")

def alerts():
# Generates alerts that are humanly readable for certain moisture thresholds
moisture = read_sensor("clean")
alert_low = 10
alert_high = 90
alert_threshold = 30
if moisture alert_low:
output = "Moisture is between 10%
and 30%, watering"
relay(14)
time.sleep(60*1)
if moisture >= alert_high:
output = "Moisture is above 90%"
sendMQTT(output, "Planty/alerts")

def read_sensor(choice):
# Read analog moisture data from moisture probe via Arduino UNO plugged into
# the RPi via USB
min = 330
max = 970
ser = serial.Serial('/dev/ttyACM0', 9600)
#Line below handles the problem of the Arduino returning more than 1 reading
raw_data = int(str.split(ser.readline())[0])
if choice == "raw":
moisture = raw_data
#Stop readings going over or under 0 or 100%
if raw_data max:
raw_data = max
if choice == "raw_adjusted":
moisture = raw_data
if choice == "clean":
#Get moisture as % reading
moisture = int(round(100-(100/float(max-min))*(raw_data

Solution

Here're some changes that I'd personally do:
PEP8 Guidlines

  • Be consistent when writing code! You used both camelCase and snake_case conventions when you declared your functions. Chose one of them and keep it that way. PEP8 recommends the second one.



  • Talking about consistency, you should also have a space after each #. More, when you want to write what your function is doing, you should use the following:



def read_sensor(choice):
    """
    Read analog moisture data from moisture probe via Arduino UNO plugged into
    the RPi via USB
    """
    # ...


  • After each comma (or any other operator) you should have a space



  • This: if moisture alert_low: can be simplified using a chained expression:



if alert_low < moisture <= alert_threshold:
    ....


  • Use string format() instead of using the old + operator.



  • Constants should be UPPER_CASED (but not inside functions). I just took them out.



Personal anecdote:

  • It feels a bit weird to have the main() function at the beginning and everything within that function after. I'd just move it to the end as it will look more naturally.



  • You have several magic numbers which should be declared at the beginning. Doing this, you'll be able to modify their value faster than going through all the code and trying to remember what does what.



  • Don't store variables if you use them only once. It will make you're code harder to follow and it will speed it down (don't think of it as a big speed down but even so...)



The above will lead our code to the following:

import serial
import time
import RPi.GPIO as gpio
import paho.mqtt.client as mqtt

ALERT_LOW = 10
ALERT_HIGH = 90
ALERT_TRESHOLD = 30
RELAY_TIME = 14
TIME_TO_SLEEP = 60 * 1
MIN = 330
MAX = 970
RATE = 9600

def read_sensor(choice):
    """
    Read analog moisture data from moisture probe via Arduino UNO plugged into
    the RPi via USB
    """
    ser = serial.Serial('/dev/ttyACM0', RATE)

    # Line below handles the problem of the Arduino returning more than 1 reading
    raw_data = int(str.split(ser.readline())[0])
    if choice == "raw":
        moisture = raw_data

    # Stop readings going over or under 0 or 100%
    if raw_data  MAX:
        raw_data = MAX
    if choice is "raw_adjusted":
        moisture = raw_data
    if choice == "clean":
        # Get moisture as  % reading
        moisture = int(round(100 - (100 / float(MAX - MIN)) * (raw_data - MIN), 0))

    return moisture

def send_mqtt(output, topic):
    """Publish to MQTT for given topic"""
    host, user, password, port = "MQTTBroker.local", "user", "password", 1883

    mqttc = mqtt.Client()
    mqttc.username_pw_set(user, password)
    mqttc.connect(host, port)
    mqttc.publish(topic, output)

def alerts():
    """ Generates alerts that are humanly readable for certain moisture thresholds"""
    moisture, output = read_sensor("clean"), ''

    if moisture = ALERT_HIGH:
        output = "***Moisture is above 90%***"
    send_mqtt(output, "Planty/alerts")
    
    
def debugger():
    """This is just to test each reading"""
    output = "Soil Moisture is: {}%\n" \
             "Raw_adjusted is: {}\n" \
             "Raw is: {}".format(read_sensor("clean"), read_sensor("raw_adjusted"), read_sensor("raw"))

    send_mqtt(output, "Planty/debug")

def relay(pin):
    """Turn on relay via PIN.HIGH"""
    try:
        gpio.setmode(gpio.BCM)
        gpio.setwarnings(False)
        gpio.setup(pin, gpio.OUT, initial=1)
        send_mqtt("Turning Relay1 ON", "Planty/relay")
        gpio.output(pin, 0)
        time.sleep(5)
        send_mqtt("Turning Relay1 OFF", "Planty/relay")
        gpio.output(pin, 1)
    finally:
        gpio.cleanup()

def main():
    try:
        while 1:
            send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
            alerts()
    except KeyboardInterrupt:
        print("\n\nExiting via Keyboard Interrupt")

if __name__ == '__main__':
    main()


Since you're using the try/except in main only for KeyboardInterrupt I'd completely remove it as it seems redundant.

def main():
    while 1:
        send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
        alerts()

Code Snippets

def read_sensor(choice):
    """
    Read analog moisture data from moisture probe via Arduino UNO plugged into
    the RPi via USB
    """
    # ...
if alert_low < moisture <= alert_threshold:
    ....
import serial
import time
import RPi.GPIO as gpio
import paho.mqtt.client as mqtt

ALERT_LOW = 10
ALERT_HIGH = 90
ALERT_TRESHOLD = 30
RELAY_TIME = 14
TIME_TO_SLEEP = 60 * 1
MIN = 330
MAX = 970
RATE = 9600


def read_sensor(choice):
    """
    Read analog moisture data from moisture probe via Arduino UNO plugged into
    the RPi via USB
    """
    ser = serial.Serial('/dev/ttyACM0', RATE)

    # Line below handles the problem of the Arduino returning more than 1 reading
    raw_data = int(str.split(ser.readline())[0])
    if choice == "raw":
        moisture = raw_data

    # Stop readings going over or under 0 or 100%
    if raw_data < MIN:
        raw_data = MIN
    if raw_data > MAX:
        raw_data = MAX
    if choice is "raw_adjusted":
        moisture = raw_data
    if choice == "clean":
        # Get moisture as  % reading
        moisture = int(round(100 - (100 / float(MAX - MIN)) * (raw_data - MIN), 0))

    return moisture


def send_mqtt(output, topic):
    """Publish to MQTT for given topic"""
    host, user, password, port = "MQTTBroker.local", "user", "password", 1883

    mqttc = mqtt.Client()
    mqttc.username_pw_set(user, password)
    mqttc.connect(host, port)
    mqttc.publish(topic, output)


def alerts():
    """ Generates alerts that are humanly readable for certain moisture thresholds"""
    moisture, output = read_sensor("clean"), ''

    if moisture <= ALERT_LOW:
        output = "**Moisture is below 10%**"
    if ALERT_LOW < moisture <= ALERT_TRESHOLD:
        output = "***Moisture is between 10%` and 30%, watering***"
        relay(RELAY_TIME)
        time.sleep(TIME_TO_SLEEP)
    if moisture >= ALERT_HIGH:
        output = "***Moisture is above 90%***"
    send_mqtt(output, "Planty/alerts")
    
    
def debugger():
    """This is just to test each reading"""
    output = "Soil Moisture is: {}%\n" \
             "Raw_adjusted is: {}\n" \
             "Raw is: {}".format(read_sensor("clean"), read_sensor("raw_adjusted"), read_sensor("raw"))

    send_mqtt(output, "Planty/debug")


def relay(pin):
    """Turn on relay via PIN.HIGH"""
    try:
        gpio.setmode(gpio.BCM)
        gpio.setwarnings(False)
        gpio.setup(pin, gpio.OUT, initial=1)
        send_mqtt("Turning Relay1 ON", "Planty/relay")
        gpio.output(pin, 0)
        time.sleep(5)
        send_mqtt("Turning Relay1 OFF", "Planty/relay")
        gpio.output(pin, 1)
    finally:
        gpio.cleanup()


def main():
    try:
        while 1:
            send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
            alerts()
    except KeyboardInterrupt:
        print("\n\nExiting via Keyboard Interrupt")


if __name__ == '__main__':
    main()
def main():
    while 1:
        send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
        alerts()

Context

StackExchange Code Review Q#146482, answer score: 3

Revisions (0)

No revisions yet.