Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ def generate_launch_description():
],
)

node_native_plugin = Node(
node_provision_plugin = Node(
namespace=LaunchConfiguration("muto_namespace"),
name="native_plugin",
name="provision_plugin",
package="composer",
executable="native_plugin",
executable="provision_plugin",
output="screen",
parameters=[
muto_params,
Expand Down Expand Up @@ -352,7 +352,7 @@ def generate_launch_description():
ld.add_action(node_twin)
ld.add_action(node_composer)
ld.add_action(node_compose_plugin)
ld.add_action(node_native_plugin)
ld.add_action(node_provision_plugin)
ld.add_action(node_launch_plugin)

return ld
Expand Down
70 changes: 67 additions & 3 deletions agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,21 @@ class MQTTConfig:
prefix: str = "muto"
name: str = ""


@dataclass
class SymphonyConfig:
"""Configuration for Symphony connection."""
mqtt: MQTTConfig = field(default_factory=MQTTConfig)
target: str = "muto-target"
enabled: bool = False
topic_prefix: str = "symphony"
api_url: str = "http://localhost:8082/v1alpha2/",
provider_name: str = "providers.target.mqtt",
broker_address: str = "tcp://mosquitto:1883",
client_id: str = "symphony",
request_topic: str = "coa-request",
response_topic: str = "coa-response",
timeout_seconds: int = 30,
auto_register: bool = False
@dataclass
class TopicConfig:
"""Configuration for ROS topics."""
Expand All @@ -60,6 +74,7 @@ class AgentConfig:
"""Main configuration for the Muto Agent."""
mqtt: MQTTConfig = field(default_factory=MQTTConfig)
topics: TopicConfig = field(default_factory=TopicConfig)
symphony: SymphonyConfig = field(default_factory=SymphonyConfig)


class ConfigurationManager:
Expand Down Expand Up @@ -107,6 +122,33 @@ def load_config(self) -> AgentConfig:
name=self._get_parameter("name", "")
)

sym_mqtt_config = MQTTConfig(
host=self._get_parameter("symphony_host", "sandbox.composiv.ai"),
port=self._get_parameter("symphony_port", 1883),
keep_alive=self._get_parameter("symphony_keep_alive", 60),
user=self._get_parameter("symphony_user", ""),
password=self._get_parameter("symphony_password", ""),
namespace=self._get_parameter("symphony_namespace", ""),
prefix=self._get_parameter("symphony_prefix", "muto"),
name=self._get_parameter("symphony_name", "")
)

symphony_config = SymphonyConfig(
mqtt=sym_mqtt_config,
target=self._get_parameter("symphony_target_name", "muto-target"),
enabled=self._get_parameter("symphony_enabled", False),

topic_prefix=self._get_parameter("symphony_topic_prefix", "symphony"),
api_url=self._get_parameter("symphony_api_url", "http://localhost:8082/v1alpha2/"),
provider_name=self._get_parameter("symphony_provider_name", "providers.target.mqtt"),
broker_address=self._get_parameter("symphony_broker_address", "tcp://mosquitto:1883"),
client_id=self._get_parameter("symphony_client_id", "symphony"),
request_topic=self._get_parameter("symphony_request_topic", "coa-request"),
response_topic=self._get_parameter("symphony_response_topic", "coa-response"),
timeout_seconds=self._get_parameter("symphony_timeout_seconds", 30),
auto_register=self._get_parameter("symphony_auto_register", False),
)

# Load topic configuration
topic_config = TopicConfig(
stack_topic=self._get_parameter("stack_topic", "stack"),
Expand All @@ -118,7 +160,7 @@ def load_config(self) -> AgentConfig:
thing_messages_topic=self._get_parameter("thing_messages_topic", "thing_messages")
)

self._config = AgentConfig(mqtt=mqtt_config, topics=topic_config)
self._config = AgentConfig(mqtt=mqtt_config, topics=topic_config, symphony=symphony_config)
self._validate_config()

self._node.get_logger().info("Configuration loaded successfully")
Expand Down Expand Up @@ -154,11 +196,33 @@ def _declare_parameters(self) -> None:
("name", ""),
("stack_topic", "stack"),
("twin_topic", "twin"),

("agent_to_gateway_topic", "agent_to_gateway"),
("gateway_to_agent_topic", "gateway_to_agent"),
("agent_to_commands_topic", "agent_to_command"),
("commands_to_agent_topic", "command_to_agent"),
("thing_messages_topic", "thing_messages")
("thing_messages_topic", "thing_messages"),

("symphony_enabled", False),
("symphony_host", "sandbox.composiv.ai"),
("symphony_port", 1883),
("symphony_keep_alive", 60),
("symphony_namespace", ""),
("symphony_prefix", "muto"),
('symphony_target_name', 'muto-device-001'),
('symphony_topic_prefix', 'symphony'),
('symphony_enable', False),
('symphony_api_url', 'http://localhost:8082/v1alpha2/'),
('symphony_user', 'admin'),
('symphony_password', ''),
('symphony_name', 'muto-device-001'),
('symphony_provider_name', 'providers.target.mqtt'),
('symphony_broker_address', 'tcp://mosquitto:1883'),
('symphony_client_id', 'symphony'),
('symphony_request_topic', 'coa-request'),
('symphony_response_topic', 'coa-response'),
('symphony_timeout_seconds', '30'),
('symphony_auto_register', False),
]

for param_name, default_value in parameters:
Expand Down
60 changes: 46 additions & 14 deletions agent/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

# Standard library imports
import json
import signal
import threading
from typing import Optional

# Third-party imports
Expand Down Expand Up @@ -311,26 +313,56 @@ def is_mqtt_connected(self) -> bool:


def main():
"""Main entry point for the MQTT Gateway."""
rclpy.init()
"""Main entry point for the Muto MQTT."""
provider = None
shutdown_requested = threading.Event()

def signal_handler(signum, frame):
"""Handle shutdown signals gracefully."""
print(f"Received signal {signum}, initiating graceful shutdown...")
shutdown_requested.set()
if provider is not None:
provider._shutdown_event.set()

# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

try:
gateway = MQTT()
gateway.initialize()
rclpy.init()
provider = MQTT()
provider.initialize()

provider.get_logger().info("Muto MQTT started successfully")

gateway.get_logger().info("MQTT Gateway started successfully")
rclpy.spin(gateway)
# Custom spin loop to handle shutdown gracefully
while rclpy.ok() and not shutdown_requested.is_set():
try:
rclpy.spin_once(provider, timeout_sec=1.0)
except KeyboardInterrupt:
break

except KeyboardInterrupt:
print("Muto MQTT interrupted by user")
except Exception as e:
print(f"Failed to start MQTT Gateway: {e}")
print(f"Failed to start Muto MQTT: {e}")

finally:
# Cleanup provider if it was created
if provider is not None:
try:
print("Cleaning up Muto MQTT...")
provider.cleanup()
except Exception as e:
print(f"Error during provider cleanup: {e}")

# Only shutdown ROS2 if it's still initialized and we haven't already shut it down
try:
gateway.cleanup()
except:
pass
rclpy.shutdown()

if rclpy.ok():
print("Shutting down ROS2...")
rclpy.shutdown()
except Exception as e:
print(f"Error during ROS2 shutdown (this may be normal): {e}")

if __name__ == "__main__":
main()
if __name__ == '__main__':
exit(main())
21 changes: 14 additions & 7 deletions agent/mqtt_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MQTTConnectionManager(ConnectionManager):
automatic reconnection, proper error handling, and connection monitoring.
"""

def __init__(self, node: BaseNode, config: MQTTConfig, message_handler: Callable[[MQTTMessage], None], logger: Optional[Any] = None):
def __init__(self, node: BaseNode, config: MQTTConfig, message_handler: Callable[[MQTTMessage], None], on_connect_handler: Optional[Callable] = None, logger: Optional[Any] = None):
"""
Initialize the MQTT connection manager.

Expand All @@ -53,6 +53,7 @@ def __init__(self, node: BaseNode, config: MQTTConfig, message_handler: Callable
"""
self._config = config
self._message_handler = message_handler
self._on_connect_handler = on_connect_handler
self._client: Optional[Client] = None
self._connected = False
self._node = node
Expand Down Expand Up @@ -209,14 +210,20 @@ def _on_connect(self, client, userdata, flags, reason_code, properties) -> None:
reason_code: Connection result code.
properties: MQTT v5 properties.
"""


if reason_code == 0:
self._connected = True

# Subscribe to twin topic
twin_topic = f"{self._config.prefix}/{self._config.namespace}:{self._config.name}"
self.subscribe(twin_topic)

self.get_logger().info(f"MQTT connected and subscribed to {twin_topic}")

# If there is a self.on_connect_handler use it otherwise default
if self._on_connect_handler is not None:
self._on_connect_handler(client, userdata, flags, reason_code, properties)
else:
# default behavior
# Subscribe to twin topic
twin_topic = f"{self._config.prefix}/{self._config.namespace}:{self._config.name}"
self.subscribe(twin_topic)
self.get_logger().info(f"MQTT connected and subscribed to {twin_topic}")
else:
self._connected = False
self.get_logger().error(f"MQTT connection failed with reason code: {reason_code}")
Expand Down
43 changes: 37 additions & 6 deletions agent/muto_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"""

# Standard library imports
import signal
import threading
from typing import Optional, Tuple

# Third-party imports
Expand Down Expand Up @@ -253,27 +255,56 @@ def is_ready(self) -> bool:

def main():
"""Main entry point for the Muto Agent."""
rclpy.init()
agent = None
shutdown_requested = threading.Event()

def signal_handler(signum, frame):
"""Handle shutdown signals gracefully."""
print(f"Received signal {signum}, initiating graceful shutdown...")
shutdown_requested.set()

# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

try:
rclpy.init()
agent = MutoAgent()
agent.initialize()

if agent.is_ready():
agent.get_logger().info("Muto Agent started successfully")
rclpy.spin(agent)

# Custom spin loop to handle shutdown gracefully
while rclpy.ok() and not shutdown_requested.is_set():
try:
rclpy.spin_once(agent, timeout_sec=1.0)
except KeyboardInterrupt:
break
else:
agent.get_logger().error("Muto Agent failed to initialize properly")

except KeyboardInterrupt:
print("Muto Agent interrupted by user")
except Exception as e:
print(f"Failed to start Muto Agent: {e}")

finally:
# Cleanup agent if it was created
if agent is not None:
try:
print("Cleaning up Muto Agent...")
agent.cleanup()
except Exception as e:
print(f"Error during agent cleanup: {e}")

# Only shutdown ROS2 if it's still initialized and we haven't already shut it down
try:
agent.cleanup()
except:
pass
rclpy.shutdown()
if rclpy.ok():
print("Shutting down ROS2...")
rclpy.shutdown()
except Exception as e:
print(f"Error during ROS2 shutdown (this may be normal): {e}")


if __name__ == "__main__":
Expand Down
56 changes: 56 additions & 0 deletions agent/symphony/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# SDK Package

The `agent.symphony.sdk` package contains an SDK for Symphony data structures and utilities:

- **symphony_api.py**: REST API client and authentication
- **symphony_sdk.py**: COA protocol, data structures, serialization
- **symphony_summary.py**: Deployment summary and result models
- **symphony_types.py**: State enums and constants

## Package Structure

```
src/agent/agent/symphony/
├── __init__.py # Package exports and convenience imports
├── symphony_broker.py # MQTT broker integration
├── symphony_provider.py # Main Symphony provider implementation
└── sdk/ # Core SDK components
├── __init__.py # SDK exports and convenience imports
├── symphony_api.py # REST API client for Symphony
├── symphony_sdk.py # COA data structures and SDK
├── symphony_summary.py # Summary models and result handling
└── symphony_types.py # State enums and constants
```

## Import
```python
# Convenient imports from main symphony package
from agent.symphony import COARequest, COAResponse, State

# SDK-specific imports
from agent.symphony.sdk import SummaryResult, SymphonyAPIClient

# Provider/broker imports
from agent.symphony.symphony_provider import MutoSymphonyProvider

# Direct module access
from agent.symphony.sdk.symphony_types import State as SymphonyState
```


### SDK Usage Examples

```python
# Import everything from SDK
from agent.symphony.sdk import *

# Create COA request/response
request = COARequest(method='GET', route='/targets')
response = COAResponse.success({'targets': []})

# Use summary models
summary = SummarySpec(target_count=1, success_count=1)

# API client usage
client = SymphonyAPIClient(base_url='http://localhost:8082/v1alpha2/')
```
Loading