patternpythonMinor
Reading analog moisture signal from Arduino over USB to RPi3 and publishing via paho MQTT
Viewed 0 times
arduinoreadingsignalrpi3moisturemqttpahousbpublishingvia
Problem
This program is meant to read a moisture sensor attached to an Arduino UNO board via USB to a Raspberry Pi3 which broadcasts it over MQTT.
I am looking for any constructive criticism.
``
relay(14)
time.sleep(60*1)
if moisture >= alert_high:
output = "Moisture is above 90%"
sendMQTT(output, "Planty/alerts")
def read_sensor(choice):
# Read analog moisture data from moisture probe via Arduino UNO plugged into
# the RPi via USB
min = 330
max = 970
ser = serial.Serial('/dev/ttyACM0', 9600)
#Line below handles the problem of the Arduino returning more than 1 reading
raw_data = int(str.split(ser.readline())[0])
if choice == "raw":
moisture = raw_data
#Stop readings going over or under 0 or 100%
if raw_data max:
raw_data = max
if choice == "raw_adjusted":
moisture = raw_data
if choice == "clean":
#Get moisture as % reading
moisture = int(round(100-(100/float(max-min))*(raw_data
I am looking for any constructive criticism.
``
import serial
import time
import RPi.GPIO as GPIO
import paho.mqtt.client as mqtt
def main():
try:
while 1:
sendMQTT(read_sensor("clean"),"Planty/moisture_readings")
alerts()
except KeyboardInterrupt:
print "\n\nExiting via Keyboard Interrupt"
def debugger():
# This is just to test each reading
moisture_clean = read_sensor("clean")
output = "Soil Moisture is " + str(moisture_clean) + "%"
moisture_raw_adjusted = read_sensor("raw_adjusted")
output += "\nRaw_adjusted is " + str(moisture_raw_adjusted)
moisture_raw = read_sensor("raw")
output += "\nRaw is " + str(moisture_raw)
sendMQTT(output, "Planty/debug")
def alerts():
# Generates alerts that are humanly readable for certain moisture thresholds
moisture = read_sensor("clean")
alert_low = 10
alert_high = 90
alert_threshold = 30
if moisture alert_low:
output = "Moisture is between 10% and 30%, watering"relay(14)
time.sleep(60*1)
if moisture >= alert_high:
output = "Moisture is above 90%"
sendMQTT(output, "Planty/alerts")
def read_sensor(choice):
# Read analog moisture data from moisture probe via Arduino UNO plugged into
# the RPi via USB
min = 330
max = 970
ser = serial.Serial('/dev/ttyACM0', 9600)
#Line below handles the problem of the Arduino returning more than 1 reading
raw_data = int(str.split(ser.readline())[0])
if choice == "raw":
moisture = raw_data
#Stop readings going over or under 0 or 100%
if raw_data max:
raw_data = max
if choice == "raw_adjusted":
moisture = raw_data
if choice == "clean":
#Get moisture as % reading
moisture = int(round(100-(100/float(max-min))*(raw_data
Solution
Here're some changes that I'd personally do:
PEP8 Guidlines
Personal anecdote:
The above will lead our code to the following:
Since you're using the try/except in main only for
PEP8 Guidlines
- Be consistent when writing code! You used both camelCase and snake_case conventions when you declared your functions. Chose one of them and keep it that way. PEP8 recommends the second one.
- Talking about consistency, you should also have a space after each
#. More, when you want to write what your function is doing, you should use the following:
def read_sensor(choice):
"""
Read analog moisture data from moisture probe via Arduino UNO plugged into
the RPi via USB
"""
# ...- After each comma (or any other operator) you should have a space
- This:
if moisture alert_low:can be simplified using a chained expression:
if alert_low < moisture <= alert_threshold:
....- Use string
format()instead of using the old+operator.
- Constants should be UPPER_CASED (but not inside functions). I just took them out.
Personal anecdote:
- It feels a bit weird to have the
main()function at the beginning and everything within that function after. I'd just move it to the end as it will look more naturally.
- You have several magic numbers which should be declared at the beginning. Doing this, you'll be able to modify their value faster than going through all the code and trying to remember what does what.
- Don't store variables if you use them only once. It will make you're code harder to follow and it will speed it down (don't think of it as a big speed down but even so...)
The above will lead our code to the following:
import serial
import time
import RPi.GPIO as gpio
import paho.mqtt.client as mqtt
ALERT_LOW = 10
ALERT_HIGH = 90
ALERT_TRESHOLD = 30
RELAY_TIME = 14
TIME_TO_SLEEP = 60 * 1
MIN = 330
MAX = 970
RATE = 9600
def read_sensor(choice):
"""
Read analog moisture data from moisture probe via Arduino UNO plugged into
the RPi via USB
"""
ser = serial.Serial('/dev/ttyACM0', RATE)
# Line below handles the problem of the Arduino returning more than 1 reading
raw_data = int(str.split(ser.readline())[0])
if choice == "raw":
moisture = raw_data
# Stop readings going over or under 0 or 100%
if raw_data MAX:
raw_data = MAX
if choice is "raw_adjusted":
moisture = raw_data
if choice == "clean":
# Get moisture as % reading
moisture = int(round(100 - (100 / float(MAX - MIN)) * (raw_data - MIN), 0))
return moisture
def send_mqtt(output, topic):
"""Publish to MQTT for given topic"""
host, user, password, port = "MQTTBroker.local", "user", "password", 1883
mqttc = mqtt.Client()
mqttc.username_pw_set(user, password)
mqttc.connect(host, port)
mqttc.publish(topic, output)
def alerts():
""" Generates alerts that are humanly readable for certain moisture thresholds"""
moisture, output = read_sensor("clean"), ''
if moisture = ALERT_HIGH:
output = "***Moisture is above 90%***"
send_mqtt(output, "Planty/alerts")
def debugger():
"""This is just to test each reading"""
output = "Soil Moisture is: {}%\n" \
"Raw_adjusted is: {}\n" \
"Raw is: {}".format(read_sensor("clean"), read_sensor("raw_adjusted"), read_sensor("raw"))
send_mqtt(output, "Planty/debug")
def relay(pin):
"""Turn on relay via PIN.HIGH"""
try:
gpio.setmode(gpio.BCM)
gpio.setwarnings(False)
gpio.setup(pin, gpio.OUT, initial=1)
send_mqtt("Turning Relay1 ON", "Planty/relay")
gpio.output(pin, 0)
time.sleep(5)
send_mqtt("Turning Relay1 OFF", "Planty/relay")
gpio.output(pin, 1)
finally:
gpio.cleanup()
def main():
try:
while 1:
send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
alerts()
except KeyboardInterrupt:
print("\n\nExiting via Keyboard Interrupt")
if __name__ == '__main__':
main()Since you're using the try/except in main only for
KeyboardInterrupt I'd completely remove it as it seems redundant.def main():
while 1:
send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
alerts()Code Snippets
def read_sensor(choice):
"""
Read analog moisture data from moisture probe via Arduino UNO plugged into
the RPi via USB
"""
# ...if alert_low < moisture <= alert_threshold:
....import serial
import time
import RPi.GPIO as gpio
import paho.mqtt.client as mqtt
ALERT_LOW = 10
ALERT_HIGH = 90
ALERT_TRESHOLD = 30
RELAY_TIME = 14
TIME_TO_SLEEP = 60 * 1
MIN = 330
MAX = 970
RATE = 9600
def read_sensor(choice):
"""
Read analog moisture data from moisture probe via Arduino UNO plugged into
the RPi via USB
"""
ser = serial.Serial('/dev/ttyACM0', RATE)
# Line below handles the problem of the Arduino returning more than 1 reading
raw_data = int(str.split(ser.readline())[0])
if choice == "raw":
moisture = raw_data
# Stop readings going over or under 0 or 100%
if raw_data < MIN:
raw_data = MIN
if raw_data > MAX:
raw_data = MAX
if choice is "raw_adjusted":
moisture = raw_data
if choice == "clean":
# Get moisture as % reading
moisture = int(round(100 - (100 / float(MAX - MIN)) * (raw_data - MIN), 0))
return moisture
def send_mqtt(output, topic):
"""Publish to MQTT for given topic"""
host, user, password, port = "MQTTBroker.local", "user", "password", 1883
mqttc = mqtt.Client()
mqttc.username_pw_set(user, password)
mqttc.connect(host, port)
mqttc.publish(topic, output)
def alerts():
""" Generates alerts that are humanly readable for certain moisture thresholds"""
moisture, output = read_sensor("clean"), ''
if moisture <= ALERT_LOW:
output = "**Moisture is below 10%**"
if ALERT_LOW < moisture <= ALERT_TRESHOLD:
output = "***Moisture is between 10%` and 30%, watering***"
relay(RELAY_TIME)
time.sleep(TIME_TO_SLEEP)
if moisture >= ALERT_HIGH:
output = "***Moisture is above 90%***"
send_mqtt(output, "Planty/alerts")
def debugger():
"""This is just to test each reading"""
output = "Soil Moisture is: {}%\n" \
"Raw_adjusted is: {}\n" \
"Raw is: {}".format(read_sensor("clean"), read_sensor("raw_adjusted"), read_sensor("raw"))
send_mqtt(output, "Planty/debug")
def relay(pin):
"""Turn on relay via PIN.HIGH"""
try:
gpio.setmode(gpio.BCM)
gpio.setwarnings(False)
gpio.setup(pin, gpio.OUT, initial=1)
send_mqtt("Turning Relay1 ON", "Planty/relay")
gpio.output(pin, 0)
time.sleep(5)
send_mqtt("Turning Relay1 OFF", "Planty/relay")
gpio.output(pin, 1)
finally:
gpio.cleanup()
def main():
try:
while 1:
send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
alerts()
except KeyboardInterrupt:
print("\n\nExiting via Keyboard Interrupt")
if __name__ == '__main__':
main()def main():
while 1:
send_mqtt(read_sensor("clean"), "Planty/moisture_readings")
alerts()Context
StackExchange Code Review Q#146482, answer score: 3
Revisions (0)
No revisions yet.