From e50121bb8fbd77913cb61c844d399e809469c836 Mon Sep 17 00:00:00 2001 From: "A. Eilers" Date: Mon, 15 Jan 2024 22:26:08 +0100 Subject: [PATCH] Added MQTT and HomeAssistant support. added a new section 'data-receiver:' to config.yaml added libraries tzlocal, pytz and paho-mqtt added MQTT.py modified main.py --- MQTT.py | 122 ++++++++++++++ config.yaml | 13 ++ get_pi_requirements.sh | 9 +- main.py | 354 +++++++++++++++++++++++++---------------- 4 files changed, 361 insertions(+), 137 deletions(-) create mode 100644 MQTT.py diff --git a/MQTT.py b/MQTT.py new file mode 100644 index 0000000..cf9d387 --- /dev/null +++ b/MQTT.py @@ -0,0 +1,122 @@ +import time +from paho.mqtt import client as mqtt_client +import threading +import json +import os +import logging + + +class SendMQTT(threading.Thread): + def __init__(self, brokerIP: str, username: str, password: str, brokerPort: int = 1883): + super().__init__() + self.BrokerIP = brokerIP + self.BrokerPort = brokerPort + self.Username = username + self.Password = password + self.MyClient = mqtt_client.Client() + self.connection_alive = False + self.Serial = None + self.Cmnd = None + self.CmndValue = None + self.logger = logging.getLogger('main.mqtt') + + def __on_message(self, client, userdata, message): + print(f"Message: {message.topic} {message.payload.decode('utf-8')}") + myStringArray = message.topic.split('/') + self.Serial, self.Cmnd, self.CmndValue = myStringArray[1], myStringArray[3], message.payload.decode('utf-8') + self.receivedCmnd() + + def __on_connect(self, client='', userdata='', flags='', rc=''): + if rc == 0: + print(client, userdata) + print("connected OK") + self.logger.info("MQTT connected OK") + self.connection_alive = True + else: + print("Bad connection Return code=", rc, client, userdata, flags) + self.logger.info(f"Bad connection Return code={rc}") + self.connection_alive = False + + def __on_disconnect(self, client='', userdata='', rc=''): + print("Client disconnected") + self.logger.info("Connection to Broker disconnected") + self.connection_alive = False + + def __on_subscribe(self, client, userdata, mid, granted_qos): + print("Mid", client, userdata, mid, granted_qos) + return rc + + def run(self): + self.MyClient.on_connect = self.__on_connect + self.MyClient.on_disconnect = self.__on_disconnect + self.MyClient.on_subscribe = self.__on_subscribe + self.MyClient.on_message = self.__on_message + self.MyClient.loop_start() + self.MyClient.username_pw_set(username=self.Username, password=self.Password) + rc = self.MyClient.connect(self.BrokerIP, self.BrokerPort, 30) + self.logger.debug(f"Broker connect() Return code={rc}") + + def closeConnection(self): + rc = self.MyClient.disconnect() + self.logger.info("Connection to broker disconnected") + + def sendData(self, topic: str, jdata): + MyTopic = topic + self.MyClient.publish(MyTopic, str(jdata)) + + def discover_HASS(self, station_id: str): + print("Home Assistant discover") + self.logger.info("Home Assistant discover") + # sensor_types: sensor_type, unit_of_measurement, name, device_class,topic,icon + sensor_types = [ + ("temperature", "°C", "temperature", "temperature", "environment", "mdi:temperature-celsius"), + ("humidity", "%", "humidity", "humidity", "environment", "mdi:water-percent"), + ("movement_duration", None, "duration", None, "movement", "mdi:timer-marker-outline"), + ("movement_id", None, "id", None, "movement", "mdi:identifier"), + ("station_id", None, "station_id", None, "movement", "mdi:id-card"), + ("weight", "g", "weight", "weight", "movement", "mdi:weight-gram"), + # ("foodlevel", "%", "foodlevel", "mdi:gauge"), + # ("pressure", "hPa", "pressure", "pressure"), + # ("illuminance", "lx", "illuminance", "illuminance"), + ] + + for sensor_type, unit, name, device_class, topic, icon in sensor_types: + mytopic = os.path.join("homeassistant", "sensor", f"birdiary_{station_id}_{sensor_type}", "config") + mpayload = { + "state_topic": os.path.join("birdiary", station_id, topic), + "unit_of_measurement": unit, + "value_template": f"{{{{value_json.{name}}}}}", + "icon": icon, + "name": f"birdiary_{sensor_type}", + "unique_id": f"birdiary_{station_id}_{sensor_type}", + "device_class": device_class, + "device": { + "name": f"birdiary_station_{station_id[:8]}", + "manufacturer": "wiediversistmeingarten.org", + "identifiers": f"{station_id}", + "model": "birdiary_station" + } + } + self.MyClient.publish(mytopic, json.dumps(mpayload)) + time.sleep(0.5) + sensor_types = [ + ("movement", "start_date", "timestamp", "movement"), + ] + for sensor_type, name, device_class, topic in sensor_types: + mytopic = os.path.join("homeassistant", "sensor", f"birdiary_{station_id}_{sensor_type}", "config") + mpayload = { + "state_topic": os.path.join("birdiary", station_id, topic), + "value_template": f"{{{{value_json.{name}}}}}", + "name": f"birdiary_{sensor_type}", + "icon": "mdi:bird", + "unique_id": f"birdiary_{station_id}_{sensor_type}", + "device_class": device_class, + "device": { + "name": f"birdiary_station_{station_id[:8]}", + "manufacturer": "https://wiediversistmeingarten.org", + "identifiers": f"{station_id}", + "model": "birdiary_station" + } + } + self.MyClient.publish(mytopic, json.dumps(mpayload)) + time.sleep(0.5) diff --git a/config.yaml b/config.yaml index 6d0a125..4f04d7b 100755 --- a/config.yaml +++ b/config.yaml @@ -8,6 +8,19 @@ station: terminal_weight: 0 cameraRotation: 180 weightResetInMinutes: 20 +data-receiver: + mqtt: + # set to true(1)/false(0) to activate or deactivate the server + active: false + broker: 0.0.0.0 + # usually 1883 + brokerPort : 1883 + # username for the broker + username : + # password must be in clear Text + password : + # register to Home Assistant + discoverHASS: 1 misc: loglevel: 20 # loglevel can be either 0-NOTSET, 10-DEBUG, 20-INFO, 30-WARNING, 40-ERROR, 50-CRITICAL dev_mode: No # Yes - for testing, nothing is send to the server, No - normal usage, sync to server is enabled diff --git a/get_pi_requirements.sh b/get_pi_requirements.sh index 9d0b7ed..b5ec455 100755 --- a/get_pi_requirements.sh +++ b/get_pi_requirements.sh @@ -2,8 +2,7 @@ #get latest raspbian os packages testfile=~/station/restartmarker -if test -f "$testfile"; -then +if [ -f "$testfile" ]; then printf "\033[0;35m####### Welcome Back we will proceed with the tasks #######\n\n" rm restartmarker else @@ -14,8 +13,7 @@ else printf "\033[0;35mRestart now y/n ?[n]\033[0m\n" read -t 10 -n 1 Result - if [ "$Result" == y ] - then + if [ "$Result" == "y" ]; then printf "##### Restart now ####\n" printf "##### BYE! ####\n" touch restartmarker @@ -28,6 +26,9 @@ printf "\033[0;35m####### Install general python packages#######\033[0m\n" # General pip3 install pyyaml pip3 install schedule +pip3 install tzlocal +pip3 install pytz +pip3 install paho-mqtt pip3 install --upgrade numpy printf "\033[0;35m####### Install Senors #######\033[0m\n" diff --git a/main.py b/main.py index a75cf83..24f7710 100755 --- a/main.py +++ b/main.py @@ -1,10 +1,14 @@ import sys import shutil import time +from typing import Union + import schedule import numpy as np -from datetime import datetime, timedelta -import os +from datetime import datetime, timedelta +from tzlocal import get_localzone +import pytz +import os import yaml import json import logging @@ -12,7 +16,13 @@ import glob import requests - +# set some mqtt initial Variables +mqtt_active: bool = False +mqtt_broker: str = '' +mqtt_brokerPort: int = 1883 +mqtt_username: str = '' +mqtt_password: str = '' +mqtt_discoverHASS: bool = False dev_mode = False if not dev_mode: @@ -26,7 +36,7 @@ os.makedirs('environments') if not os.path.exists('movements'): os.makedirs('movements') -else: +else: files = glob.glob('movements/*') for f in files: os.remove(f) @@ -51,18 +61,18 @@ """ logger = logging.getLogger() -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) -logging.info("Start setup!") +logging.info("Start setup!") # Read config.yaml file logging.info("Reading configuration file!") with open("config.yaml", 'r') as stream: yamlData = yaml.safe_load(stream) - + # parse loglevel if present try: # make it backward compatible to older Versions loglevel = yamlData["misc"]["loglevel"] @@ -83,13 +93,31 @@ serverUrl = yamlData["server"]["url"] boxId = yamlData["station"]["boxId"] -environmentTimeDeltaInMinutes = yamlData["station"]["environmentTimeDeltaInMinutes"] # waiting time to send environment requests -weightThreshold = yamlData["station"]["weightThreshold"] # weight which is the threshold to recognize a movement +environmentTimeDeltaInMinutes = yamlData["station"]["environmentTimeDeltaInMinutes"] # waiting time to send environment requests +weightThreshold = yamlData["station"]["weightThreshold"] # weight which is the threshold to recognize a movement terminal_weight = yamlData["station"]["terminal_weight"] # reference unit for the balance calibration_weight = yamlData["station"]["calibration_weight"] # reference unit for the balance camera_rotation = yamlData["station"]["cameraRotation"] weightResetInMinutes = yamlData["station"]["weightResetInMinutes"] +logger.debug('Configfile data: serverUrl: %s; boxId: %s; environmentTimeDeltaInMinutes: \ + %s; weightThreshold: %s; terminal_weight: %s ;calibration_weight: %s' + % (serverUrl, boxId, environmentTimeDeltaInMinutes, weightThreshold, terminal_weight, calibration_weight)) + +if yamlData["data-receiver"]["mqtt"]["active"] and yamlData["data-receiver"]["mqtt"]["active"] == True: + import MQTT + mqtt_broker = yamlData["data-receiver"]["mqtt"]["broker"] + mqtt_brokerPort = yamlData["data-receiver"]["mqtt"]["brokerPort"] + mqtt_username = yamlData["data-receiver"]["mqtt"]["username"] + mqtt_password = yamlData["data-receiver"]["mqtt"]["password"] + mqtt_discoverHASS = yamlData["data-receiver"]["mqtt"]["discoverHASS"] + mqtt_active = yamlData["data-receiver"]["mqtt"]["active"] + logger.debug( + 'MQTT Parameters brokerip: %s; brokerPort: %s;username: %s;password: ;discoverHASS: %s,Active %s' + % (mqtt_broker, mqtt_brokerPort, mqtt_username, mqtt_discoverHASS, mqtt_active)) +else: + logging.info("No MQTT configuration active") + logging.info('loglevel=' + str(loglevel)) logging.info('dev_mode=' + str(dev_mode)) logging.info('environmentTimeDeltaInMinutes=' + str(environmentTimeDeltaInMinutes)) @@ -110,7 +138,7 @@ logging.info("Setup DHT22!") import adafruit_dht from board import * -SENSOR_PIN = D16 # use not board but GPIO number +SENSOR_PIN = D16 # use not board but GPIO number dht22 = adafruit_dht.DHT22(SENSOR_PIN, use_pulseio=False) # Setup Balance @@ -142,8 +170,32 @@ from rec_unlimited import record from multiprocessing import Process -logging.info("Setup finished!") - +# Setup Mqtt Broker +logger.info("Setup MQTT!") + +if mqtt_active: + mqtt_timeout = 0 + logger.debug("mqtt is marked as active") + bvmqtt = MQTT.SendMQTT(mqtt_broker, mqtt_username, mqtt_password, mqtt_brokerPort) + bvmqtt.start() + while False == bvmqtt.connection_alive and mqtt_timeout <= 10: + logger.debug("Waiting for connection to MQTT-Broker. Attempt:%s" % mqtt_timeout) + time.sleep(1) + mqtt_timeout += 1 + else: + if bvmqtt.connection_alive: + logger.info("Connection to broker successful") + elif mqtt_timeout > 10: + logger.warning('Connection to broker not possible. Timed-out') + if mqtt_discoverHASS == 1 and bvmqtt.connection_alive: + logger.debug('HomeAssistant discovers is active') + bvmqtt.discover_HASS(boxId) + logger.debug('Home Assistant discovery send') +else: + logger.debug('MQTT is not active') + +logging.info("Setup finished!") + def write_environment(environment_data): filename = 'environments/' + environment_data['date'] + '.json' with open(filename, 'w') as wfile: @@ -153,9 +205,10 @@ def write_movement(movement_data): with open(data_filename, 'w') as jsonfile: jsonfile.write(movement_data) + # Function to send movement data to the server def send_realtime_movement(files): - + if dev_mode: logging.warning('send_movement deactivated') logging.warning('received: ' + str(video_filename) + ' ' + str(audio_filename) + ' ' + str(data_filename)) @@ -165,17 +218,47 @@ def send_realtime_movement(files): r = requests.post(serverUrl + 'movement/' + boxId, files=files, timeout=60) logging.info('Following movement data send: %s', files) logging.debug('Corresponding movement_id: %s', r.content) + return json.loads(r.content.decode()) except (requests.ConnectionError, requests.Timeout) as exception: logging.warning('No internet connection. ' + str(exception)) logging.warning('Saving files to send later') shutil.move(audio_filename, save_audio_filename) shutil.move(video_filename, save_video_filename) write_movement(files['json'][1]) + return "" else: os.remove(video_filename) os.remove(audio_filename) -def send_realtime_environment(environmentData): + +def send_movement_mqtt(box_id, movement_data=None): + if movement_data is None: + data = {} + logger.debug('Enter function send_movement_mqtt()') + if mqtt_active: + data: dict = movement_data + data["station_id"] = box_id + local_timezone = pytz.timezone(str(get_localzone())) + + # convert start_date and add timezone + start_date_iso: datetime = datetime.strptime(data["start_date"], "%Y-%m-%d %H:%M:%S.%f") + start_date_iso: datetime = local_timezone.localize(start_date_iso).isoformat() + print(start_date_iso) + # convert end_date and add timezone + end_date_iso: datetime = datetime.strptime(data["end_date"], "%Y-%m-%d %H:%M:%S.%f") + end_date_iso: datetime = local_timezone.localize(end_date_iso).isoformat() + data["start_date"] = start_date_iso + data["end_date"] = end_date_iso + # calculate movement duration + start_date = datetime.fromisoformat(start_date_iso) + end_date = datetime.fromisoformat(end_date_iso) + time_difference = end_date - start_date + data["duration"] = time_difference.total_seconds() + + # send Data + bvmqtt.sendData(f"birdiary/{box_id}/movement", json.dumps(data)) + +def send_realtime_environment(environmentData:dict,box_id): if dev_mode: logging.warning('send_environment deactivated') logging.warning('received: ' + str(environmentData)) @@ -184,33 +267,35 @@ def send_realtime_environment(environmentData): r = requests.post(serverUrl + 'environment/' + boxId, json=environmentData, timeout=20) logging.info('Following environment data send: %s', environmentData) logging.debug('Corresponding environment_id: %s', r.content) + if mqtt_active: + bvmqtt.sendData("birdiary/" + str(box_id) + "/environment", json.dumps(environmentData)) except (requests.ConnectionError, requests.Timeout) as exception: logging.warning('No internet connection. ' + str(exception)) logging.warning("Saving environment data to send later") write_environment(environmentData) else: - send_data() + send_data() -# Function to track a environment -def track_environment(): +# Function to track a environment +def track_environment(): try: - logging.info("Collect Environment Data") - environment = {} - environment["date"] = str(datetime.now()) - environment["temperature"] = dht22.temperature - environment["humidity"] = dht22.humidity - - logging.info("Environment Data: ") - logging.info(environment) - - send_realtime_environment(environment) - - global environmentData - environmentData = environment + logging.info("Collect Environment Data") + environment = {} + environment["date"] = str(datetime.now()) + environment["temperature"] = dht22.temperature + environment["humidity"] = dht22.humidity + + logging.info("Environment Data: ") + logging.info(environment) + + send_realtime_environment(environment, boxId) + + global environmentData + environmentData = environment except Exception as e: - logging.error(e) + logging.error(e) -# predefined variables +# predefined variables environmentData = None audio_filename = None video_filename = None @@ -229,7 +314,7 @@ def tare(): time.sleep(5) weight2 = hx.get_weight(15) logging.info("Measured weight:" + str(weight2) + " in valid range. Stop taring") - + @@ -245,101 +330,104 @@ def set_filenames(movementStartDate): global data_filename data_filename = 'savedMovements/' + str(movementStartDate) + '.json' -# Function to track a movement -def track_movement(): - values = [] - - - - - # schedule an environment track for every x minutes - schedule.every(environmentTimeDeltaInMinutes).minutes.do(track_environment) - schedule.every(weightResetInMinutes).minutes.do(tare) - - - - while True: - try: - schedule.run_pending() - - weight = hx.get_weight(15) - - if (weight < weightThreshold and len(values) == 0): - logging.info("Waiting for movement! (currently measured weight: " + str(weight) + ")") - - # start movement if weight higher than threshold is recognized - if (weight > weightThreshold and len(values) == 0): - logging.info("Movement recognized!") - movementStartDate = datetime.now() - set_filenames(movementStartDate) - - global recorder - recorder = Process(target=record, args=(temp_audio_filename,)) - recorder.start() - - camera.wait_recording(1) # continue camera recording - - values.append(weight) # add current weight to weight list - - else: - # continue movement if currently recognized weight is above threshold - if (weight > weightThreshold): - values.append(weight) - camera.wait_recording(1) - - logging.info("Currently measured weight: " + str(weight)) - - hx.reset() - - # stop movement if weight is below threshold - if (weight < weightThreshold): - if (len(values) >= 1): - logging.info("Movement ending!") - movementEndDate = datetime.now() - - duration = (movementEndDate - movementStartDate).total_seconds() - stream.copy_to(video_filename, seconds=duration+5) - stream.clear() - - movementData = {} - files = {} - movementData["start_date"] = str(movementStartDate) - movementData["end_date"] = str(movementEndDate) - movementData["audio"] = "audioKey" - movementData["weight"] = np.median(values) - movementData["video"] = "videoKey" - - # stop audio recording and move temporary file to output directory - terminate_recorder() - shutil.move(temp_audio_filename, audio_filename) - - files['audioKey'] = (os.path.basename(audio_filename), open(audio_filename, 'rb')) - files['videoKey'] = (os.path.basename(video_filename), open(video_filename, 'rb')) - - - if (environmentData != None): - movementData["environment"] = environmentData - else: - movementData["environment"] = {} - - logging.info("Movement Data: ") - logging.info(movementData) - - files["json"] = (None, json.dumps(movementData), 'application/json') - - send_realtime_movement(files) - - values = [] - - - except (KeyboardInterrupt, SystemExit): - cleanAndExit() - +# Function to track a movement +def track_movement(): + values = [] + + # schedule an environment track for every x minutes + schedule.every(environmentTimeDeltaInMinutes).minutes.do(track_environment) + schedule.every(weightResetInMinutes).minutes.do(tare) + schedule.every(15).minutes.do(lambda: bvmqtt.discover_HASS(boxId)) + + while True: + try: + schedule.run_pending() + + weight = hx.get_weight(15) + + if (weight < weightThreshold and len(values) == 0): + logging.info("Waiting for movement! (currently measured weight: " + str(weight) + ")") + + # start movement if weight higher than threshold is recognized + if (weight > weightThreshold and len(values) == 0): + logging.info("Movement recognized!") + movementStartDate = datetime.now() + set_filenames(movementStartDate) + + global recorder + recorder = Process(target=record, args=(temp_audio_filename,)) + recorder.start() + + camera.wait_recording(1) # continue camera recording + + values.append(weight) # add current weight to weight list + + else: + # continue movement if currently recognized weight is above threshold + if (weight > weightThreshold): + values.append(weight) + camera.wait_recording(1) + + logging.info("Currently measured weight: " + str(weight)) + + hx.reset() + + # stop movement if weight is below threshold + if (weight < weightThreshold): + if (len(values) >= 1): + logging.info("Movement ending!") + movementEndDate = datetime.now() + + duration = (movementEndDate - movementStartDate).total_seconds() + stream.copy_to(video_filename, seconds=duration+5) + stream.clear() + + movementData = {} + files = {} + movementData["start_date"] = str(movementStartDate) + movementData["end_date"] = str(movementEndDate) + movementData["audio"] = "audioKey" + movementData["weight"] = np.median(values) + movementData["video"] = "videoKey" + + # stop audio recording and move temporary file to output directory + terminate_recorder() + shutil.move(temp_audio_filename, audio_filename) + + files['audioKey'] = (os.path.basename(audio_filename), open(audio_filename, 'rb')) + files['videoKey'] = (os.path.basename(video_filename), open(video_filename, 'rb')) + + + if (environmentData != None): + movementData["environment"] = environmentData + else: + movementData["environment"] = {} + + logging.info("Movement Data: ") + logging.info(movementData) + + files["json"] = (None, json.dumps(movementData), 'application/json') + + movement_id = send_realtime_movement(files) + print("Movement ID:", movement_id) + + if not isinstance(movement_id, dict) or "id" not in movement_id: + print("Error: movement_id is not a dictionary or has no key 'id'") + else: + movementData["id"] = movement_id["id"] + print("Movement ID successful added to Movement_data:", movementData["id"]) + send_movement_mqtt(boxId, movementData) + + values = [] + + except (KeyboardInterrupt, SystemExit): + cleanAndExit() + def cleanAndExit(): camera.close() terminate_recorder() sys.exit(2) - + def terminate_recorder(): global recorder if recorder is not None and recorder.is_alive(): @@ -347,7 +435,7 @@ def terminate_recorder(): logging.info("terminated recorder") else: logging.debug("no alive recorder") - + #Function to send environment data to the server def send_environment(filename, server_url, box_id): try: @@ -355,7 +443,7 @@ def send_environment(filename, server_url, box_id): data = json.load(envFile) except: os.remove(filename) - + if dev_mode: logging.warning('send_environment deactivated') logging.warning('received: ' + str(data)) @@ -368,10 +456,10 @@ def send_environment(filename, server_url, box_id): logging.warning('No internet connection. ' + str(exception)) else: os.remove(filename) - + # Function to send movement data to the server def send_movement(video_filename, audio_filename, data_filename, server_url, box_id): - try: + try: with open(data_filename, 'r') as dataFile: data = json.load(dataFile) files = {} @@ -382,9 +470,9 @@ def send_movement(video_filename, audio_filename, data_filename, server_url, box os.remove(video_filename) os.remove(audio_filename) os.remove(data_filename) - - + + if dev_mode: logging.warning('send_movement deactivated') logging.warning('received: ' + str(video_filename) + ' ' + str(audio_filename) + ' ' + str(data_filename)) @@ -406,16 +494,16 @@ def send_data(): environmentFiles = sorted(glob.glob('environments/*.json')) videoFiles = sorted(glob.glob('savedMovements/*.h264')) audioFiles = sorted(glob.glob('savedMovements/*.wav')) - dataFiles = sorted(glob.glob('savedMovements/*.json')) + dataFiles = sorted(glob.glob('savedMovements/*.json')) for file in environmentFiles: send_environment(file, serverUrl, boxId) for (video, audio, data) in zip(videoFiles, audioFiles, dataFiles): send_movement(video, audio, data, serverUrl, boxId) logging.info('All stored data send!') - + logging.info("Start Birdiary!") -track_movement() +track_movement()