Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion code/sys_shd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta"
# users can install this project, e.g.:
#
# $ pip install <sampleproject>
name = "system_shared_tool"
name = "system_shared_tool_rt"

# Versions should comply with PEP 440:
# https://www.python.org/dev/peps/pep-0440/
Expand Down
1 change: 1 addition & 0 deletions code/sys_shd/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ posix-ipc>=1.1.1
PyYAML>=6.0.1
system-config-tool>=0.0.7
system-logger-tool>=0.2.2
RPi-GPIO>=0.7.1
3 changes: 2 additions & 1 deletion code/sys_shd/src/system_shared_tool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
This file specifies what is going to be exported from this module.
In this case is sys_shd.
"""
from .sys_shd_common import SysShdErrorC, SysShdNodeParamsC, SysShdNodeC, SysShdNodeStatusE
from .sys_shd_common import (SysShdErrorC, SysShdNodeParamsC, SysShdNodeC, SysShdNodeStatusE,
SysShdPeripheralC)
from .sys_shd_channels import SysShdIpcChanC, SysShdChanC
from .sys_shd_objects import SysShdSharedObjC
__all__= ['SysShdChanC', 'SysShdSharedObjC', 'SysShdErrorC', 'SysShdNodeStatusE',
Expand Down
6 changes: 4 additions & 2 deletions code/sys_shd/src/system_shared_tool/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
# For further information check out README.md
DEFAULT_CHAN_NUM_MSG : int = 100 # Max number of allowed message per chan
DEFAULT_IPC_MSG_SIZE : int = 100 # Size of message sent through IPC message queue
DEFAULT_CHAN_TIMEOUT : int = 1
DEFAULT_CHAN_TIMEOUT : int|None = None # Timeout for IPC message queue if None while wait forever
DEFAULT_GPIO_CONFIG_PATH : str = 'config_gpio.yaml'

CONSTANTS_NAMES = ('DEFAULT_CHAN_NUM_MSG', 'DEFAULT_IPC_MSG_SIZE', 'DEFAULT_CHAN_TIMEOUT')
CONSTANTS_NAMES = ('DEFAULT_CHAN_NUM_MSG', 'DEFAULT_IPC_MSG_SIZE', 'DEFAULT_CHAN_TIMEOUT',
'DEFAULT_GPIO_CONFIG_PATH')
sys_conf_update_config_params(context=globals(),
constants_names=CONSTANTS_NAMES)
2 changes: 1 addition & 1 deletion code/sys_shd/src/system_shared_tool/sys_shd_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def delete_until_last(self) -> None:
while self.current_messages > 0:
self.receive(timeout = 0)

def receive_data(self, timeout: int = DEFAULT_CHAN_TIMEOUT) -> object:
def receive_data(self, timeout: int|None = DEFAULT_CHAN_TIMEOUT) -> object:
'''
Pop the first element from the queue and return it. If queue is empty,
wait until a new element is pushed to the queue.
Expand Down
40 changes: 36 additions & 4 deletions code/sys_shd/src/system_shared_tool/sys_shd_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,28 @@
from enum import Enum
from time import time, sleep
####################### SYSTEM ABSTRACTION IMPORTS #######################
from system_config_tool import sys_conf_read_config_params
from system_logger_tool import Logger, SysLogLoggerC, sys_log_logger_get_module_logger

if __name__ == "__main__":
cycler_logger = SysLogLoggerC()
log: Logger = sys_log_logger_get_module_logger(__name__)
####################### THIRD PARTY IMPORTS #######################
from RPi.GPIO import setmode, BCM, BOARD, setup, output, HIGH, LOW, OUT #pylint: disable= no-name-in-module

####################### MODULE IMPORTS #######################

####################### PROJECT IMPORTS #######################

####################### ENUMS #######################

####################### CLASSES #######################
###################### CONSTANTS ######################
from .context import DEFAULT_GPIO_CONFIG_PATH
gpio_mode = sys_conf_read_config_params(filename=DEFAULT_GPIO_CONFIG_PATH, section= 'GPIO_BOARD')
if gpio_mode == 'BOARD':
setmode(BOARD)
else:
setmode(BCM)
_TO_MS = 1000

####################### ENUMS #######################
class SysShdNodeStatusE(Enum):
'''
Enum class for the node state
Expand All @@ -37,6 +43,7 @@ class SysShdNodeStatusE(Enum):
INIT = 2
STOP = 3

####################### CLASSES #######################
class SysShdNodeParamsC:
'''
Class that contains the can parameters in order to create the thread correctly
Expand Down Expand Up @@ -66,6 +73,8 @@ def __init__(self,name: str, cycle_period: int, working_flag : Event,
working_flag (Event): [description]
node_params (SysShdNodeParamsC, optional): .Defaults to SysShdNodeParamsC().
'''
port: int = sys_conf_read_config_params(filename=DEFAULT_GPIO_CONFIG_PATH, section= name)
self.gpio: SysShdPeripheralC = SysShdPeripheralC(port=port)
super().__init__(group = None, target = node_params.target, name = name,
args = node_params.args, kwargs = node_params.kwargs,
daemon = node_params.daemon)
Expand Down Expand Up @@ -102,7 +111,9 @@ def run(self) -> None:
while self.working_flag.is_set():
try:
next_time = time()+self.cycle_period/_TO_MS
self.gpio.set_gpio_up()
self.process_iteration()
self.gpio.set_gpio_down()
# Sleep the remaining time
sleep_time = next_time-int(time())
# sleep_time is measure in miliseconds
Expand Down Expand Up @@ -135,3 +146,24 @@ def __init__(self, message) -> None:
message (str): explanation of the error
'''
super().__init__(message)

class SysShdPeripheralC:
'''
Class that contains the common methods to toggle gpios.
'''
def __init__(self, port: int) -> None:
self.port: int = port
setup(port, OUT)
def set_gpio_up(self) -> None:
"""
Sets the GPIO pin to a high state.
"""
log.debug(f"Setting GPIO {self.port} up")
output(self.port, HIGH)

def set_gpio_down(self) -> None:
"""
Sets the GPIO pin to a low state.
"""
log.debug(f"Setting GPIO {self.port} down")
output(self.port, LOW)