Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f86a2b9
more notes
anselanza Apr 11, 2025
8d47fd5
POC: the channel owns an Rc ref to the Tether Agent, which means it can
anselanza Apr 11, 2025
abb885d
Event better: no need for Rc, just ref with lifetime
anselanza Apr 11, 2025
00275eb
Restructured sender module for ChannelSender, demo auto-encoding (but
anselanza Apr 11, 2025
f349e86
POC strictly typed senders
anselanza Apr 11, 2025
97d27a6
Enable some agent-level sending/publishing as well, for convenience
anselanza Apr 11, 2025
3508025
POC: typed receiver that can auto-subscribe; but not emit messages (yet)
anselanza Apr 11, 2025
96fc7d7
POC: the ChannelReceiver checks and parses messages, "emitting" any m…
anselanza Apr 11, 2025
eae9628
demonstrate multiple matches
anselanza Apr 11, 2025
04dd158
WIP must make OptionsBuilder work again
anselanza Apr 11, 2025
503674f
When creating via OptionsBuilder, borrow checking is again a problem …
anselanza Apr 14, 2025
b332e68
Creating Definitions separately from Channels
anselanza Apr 14, 2025
a075f12
options to construct Channels with name only or with proper definitio…
anselanza Apr 14, 2025
64bdbdf
avoiding double-naming modules
anselanza Apr 14, 2025
871ad6c
WIP trying automatic check messages ON channel
anselanza Apr 14, 2025
16c80f3
Channel Senders are not properly typed (compiler does not catch errors)
anselanza Apr 17, 2025
7ed5a7d
Sending channels are properly type-checked
anselanza Apr 17, 2025
c617d09
Removed a lot of redundancy between "definitions", "channels" and th…
anselanza Apr 17, 2025
af88504
Following the logic of the Rust library, allow option to create chann…
anselanza Apr 17, 2025
df6d498
Examples updated for shorter Channel creation functions, also support…
anselanza Apr 17, 2025
a79446e
updated some notes, minor Svelte example import fixes
anselanza Apr 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions Tether4.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ For Output Plugs (publishing) NOW CHANNEL SENDERS, the topic will be constructed
- agentRole/chanelName/optionalID

For Input Plugs (subscribing) NOW CHANNEL RECEIVERS, the topic will be constructed as follows:
- agentRole/chanelName/# (matches "no ID" part and "ID part included")
- agentRole/chanelName/# (matches "no ID" part and "ID part(s) included")
- agentRole/chanelName/optionalID (will only match when ID part is matched)

The main practical difference between a "topic" and a "Channel" (previously "plug") is simply that a Channel is expected to match ONLY ONE TYPE OF MESSAGE. So, a single MQTT Client may have multiple subscriptions, but we ensure that the correct messages are matched with the correct Channel when received, by applying our additional Tether Complaint Topic (TCT) matching pattern.
The main practical difference between a "topic" and a "Channel" (previously "plug") is simply that a Channel is expected to match ONLY ONE TYPE OF MESSAGE. So, a single MQTT Client may have multiple subscriptions, but we ensure that the correct messages are matched with the correct Channel when received, by applying our additional Tether Complaint Topic (TCT) matching pattern. The libraries (particularly typed languages such as TypeScript and Rust) should try to encourage (if not enforce) this practice.

## Cleaning up
Unused "utilities" and the "explorer" will be removed.
Expand Down Expand Up @@ -49,3 +49,18 @@ Apart from the terminology changes, the following are important to note:
## Rust changes
Apart from the terminology changes, the following are important to note:
- `agent.send` used to assume an already-encoded payload, while `.encode_and_send` did auto-encoding. Now, `.send` is the auto-encoding version and additional `.send_raw` and `.send_empty` functions are provided. It is VERY important that the new `.send` will actually (incorrectly!) accept already-encoded payloads, because `&[u8]` is ALSO `T: Serialize`! So applications using the new version must be carefully checked to ensure that things are not (double) encoded before sending!

The term "OptionsBuilder" suffix has now been replaced with the much simpler "Builder", so we have simply:
- TetherAgentBuilder
- ChannelSenderBuilder
- ChannelReceiverBuilder

Even better, the ChannelSenderBuilder/ChannelReceiver builder do not **have** to be used in all cases, since both ChannelSender and ChannelReceiver objects can be constructed via the Tether Agent object itself, i.e.

- `tether_agent::create_sender`
- `tether_agent::create_receiver`

All that needs to be provided, in the default cases, is the name and the type. For example:
- `tether_agent.create_sender::<u8>("numbersOnly")` creates a ChannelSender called "numbersOnly" which will automatically expect (require) u8 payloads

The TypeScript library is now set up to mirror this as well (also, optional!). It means having to pass fewer arguments.
17 changes: 8 additions & 9 deletions examples/nodejs-ts/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ logger.debug("Debug logging enabled; output could be verbose!");
const main = async () => {
const agent = await TetherAgent.create("brain");

const sender = new ChannelSender(agent, "randomValues");
// Note the alternative syntax for doing the same thing, below:
// ...
// const sender = new ChannelSender(agent, "randomValues");
const sender = agent.createSender("randomValues");

sender.send({
value: Math.random(),
timestamp: Date.now(),
something: "one",
});

const genericReceiver = await ChannelReceiver.create(
agent,
const genericReceiver = await agent.createReceiver(
"randomValuesStrictlyTyped"
);
genericReceiver.on("message", (payload, topic) => {
Expand All @@ -38,18 +41,14 @@ const main = async () => {
);
});

const typedReceiver = await ChannelReceiver.create<number>(
agent,
const typedReceiver = await agent.createReceiver<number>(
"randomValuesStrictlyTyped"
);
typedReceiver.on("message", (payload) => {
logger.info("Our typed receiver got", payload, typeof payload);
});

const typedSender = new ChannelSender<number>(
agent,
"randomValuesStrictlyTyped"
);
const typedSender = agent.createSender<number>("randomValuesStrictlyTyped");
// This will be rejected by TypeScript compiler:
// typedSender.send({
// value: Math.random(),
Expand Down
10 changes: 6 additions & 4 deletions examples/react-ts/src/Tether/Receiver.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ interface Props {
}

export const Receiver = (props: Props) => {
const { agent } = props;
const [channel, setChannel] = useState<ChannelReceiver<unknown> | null>(null);
const [lastMessage, setLastMessage] = useState("");

useEffect(() => {
ChannelReceiver.create(props.agent, "everything", {
overrideTopic: "#",
})
agent
.createReceiver("everything", {
overrideTopic: "#",
})
.then((channel) => {
setChannel(channel);
channel.on("message", (payload, topic) => {
Expand All @@ -24,7 +26,7 @@ export const Receiver = (props: Props) => {
.catch((e) => {
console.error("Error creating Channel Receiver:", e);
});
}, [props.agent]);
}, [agent]);

return (
<div>
Expand Down
16 changes: 9 additions & 7 deletions examples/react-ts/src/Tether/Sender.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ interface Props {
}

export const Sender = (props: Props) => {
const { agent } = props;

useEffect(() => {
console.log("Sender useEffect");
setChannel(new ChannelSender(props.agent, "sender"));
}, [props.agent]);
setChannel(new ChannelSender(agent, "sender"));
}, [agent]);

const [useCustomTopic, setUseCustomTopic] = useState(false);
const [customTopic, setTCustomTopic] = useState("");
Expand All @@ -32,7 +34,7 @@ export const Sender = (props: Props) => {
<button
onClick={() =>
setChannel(
new ChannelSender(props.agent, "sender", {
agent.createSender("sender", {
overrideTopic: customTopic,
})
)
Expand All @@ -43,7 +45,7 @@ export const Sender = (props: Props) => {
<button
onClick={() => {
setUseCustomTopic(false);
setChannel(new ChannelSender(props.agent, "sender"));
setChannel(agent.createSender("sender"));
}}
>
Back to default
Expand All @@ -65,9 +67,9 @@ export const Sender = (props: Props) => {
await channel.send();
} catch (e) {
console.error("We got an error when trying to publish:", e);
console.log("agent connected?", props.agent.getIsConnected());
console.log("agent state?", props.agent.getState());
console.log("agent client?", props.agent.getClient());
console.log("agent connected?", agent.getIsConnected());
console.log("agent state?", agent.getState());
console.log("agent client?", agent.getClient());
}
}}
>
Expand Down
51 changes: 26 additions & 25 deletions examples/rust/custom_options.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::time::Duration;

use tether_agent::{
ChannelDefinition, ChannelDefinitionCommon, ChannelOptionsBuilder, TetherAgentOptionsBuilder,
};
use tether_agent::{definitions::*, TetherAgentBuilder};

fn main() {
let mut tether_agent = TetherAgentOptionsBuilder::new("example")
let tether_agent = TetherAgentBuilder::new("example")
.id(None)
.host(Some("localhost"))
.port(Some(1883))
Expand All @@ -14,41 +12,44 @@ fn main() {
.build()
.expect("failed to create Tether Agent");

let sender_channel = ChannelOptionsBuilder::create_sender("anOutput")
let sender_channel_def = ChannelSenderBuilder::new("anOutput")
.role(Some("pretendingToBeSomethingElse"))
.qos(Some(2))
.retain(Some(true))
.build(&mut tether_agent)
.expect("failed to create sender channel");
let input_wildcard_channel = ChannelOptionsBuilder::create_receiver("everything")
.topic(Some("#"))
.build(&mut tether_agent);
.build(&tether_agent);

let input_customid_channel = ChannelOptionsBuilder::create_receiver("someData")
.role(None) // i.e., just use default
.id(Some("specificIDonly"))
.build(&mut tether_agent);
let sender_channel = tether_agent.create_sender_with_definition(sender_channel_def);

let input_wildcard_channel_def = ChannelReceiverBuilder::new("everything")
.override_topic(Some("#"))
.build();
let input_wildcard_channel = tether_agent
.create_receiver_with_definition::<u8>(input_wildcard_channel_def)
.expect("failed to create Channel Receiver");

// let input_customid_channel_def = ChannelReceiverOptions::new("someData")
// .role(None) // i.e., just use default
// .id(Some("specificIDonly"))
// .build();

println!("Agent looks like this: {:?}", tether_agent.description());
let (role, id, _) = tether_agent.description();
assert_eq!(role, "example");
assert_eq!(id, "any"); // because we set None

if let ChannelDefinition::ChannelSender(p) = &sender_channel {
println!("sender channel: {:?}", p);
assert_eq!(
p.generated_topic(),
"pretendingToBeSomethingElse/any/anOutput"
);
}

println!("wildcard input channel: {:?}", input_wildcard_channel);
println!("speific ID input channel: {:?}", input_customid_channel);
println!(
"wildcard input channel: {:?}",
input_wildcard_channel.definition().generated_topic()
);
// println!(
// "speific ID input channel: {:?}",
// input_customid_channel_def.generated_topic()
// );

let payload =
rmp_serde::to_vec::<String>(&String::from("boo")).expect("failed to serialise payload");
tether_agent
.send(&sender_channel, Some(&payload))
.send(&sender_channel, &payload)
.expect("failed to publish");

std::thread::sleep(Duration::from_millis(4000));
Expand Down
123 changes: 16 additions & 107 deletions examples/rust/receive.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{thread, time::Duration};

use env_logger::{Builder, Env};
use log::{debug, info, warn};
use rmp_serde::from_slice;
use log::*;
use serde::Deserialize;
use tether_agent::{ChannelOptionsBuilder, TetherAgentOptionsBuilder};
use tether_agent::TetherAgentBuilder;

#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct CustomMessage {
id: usize,
Expand All @@ -17,127 +15,38 @@ struct CustomMessage {
fn main() {
println!("Rust Tether Agent subscribe example");

let mut builder = Builder::from_env(Env::default().default_filter_or("debug"));
let mut builder = Builder::from_env(Env::default().default_filter_or("info"));
builder.filter_module("tether_agent", log::LevelFilter::Warn);
builder.filter_module("rumqttc", log::LevelFilter::Warn);
builder.init();

debug!("Debugging is enabled; could be verbose");

let mut tether_agent = TetherAgentOptionsBuilder::new("RustDemo")
let tether_agent = TetherAgentBuilder::new("RustDemo")
.id(Some("example"))
.build()
.expect("failed to init Tether agent");

let input_one = ChannelOptionsBuilder::create_receiver("one")
.build(&mut tether_agent)
.expect("failed to create input");
info!(
"input one {} = {}",
input_one.name(),
input_one.generated_topic()
);
let input_two = ChannelOptionsBuilder::create_receiver("two")
.role(Some("specific"))
.build(&mut tether_agent)
.expect("failed to create input");
info!(
"input two {} = {}",
input_two.name(),
input_two.generated_topic()
);
let input_empty = ChannelOptionsBuilder::create_receiver("nothing")
.build(&mut tether_agent)
.expect("failed to create input");

let input_everything = ChannelOptionsBuilder::create_receiver("everything")
.topic(Some("#"))
.build(&mut tether_agent)
.expect("failed to create input");

let input_specify_id = ChannelOptionsBuilder::create_receiver("groupMessages")
.id(Some("someGroup"))
.name(None)
.build(&mut tether_agent)
.expect("failed to create input");

debug!(
"input everything {} = {}",
input_everything.name(),
input_everything.generated_topic()
);
let receiver_of_numbers = tether_agent
.create_receiver::<u8>("numbersOnly")
.expect("failed to create receiver");

info!("Checking messages every 1s, 10x...");
let receiver_of_custom_structs = tether_agent
.create_receiver::<CustomMessage>("customStructs")
.expect("failed to create receiver");

loop {
debug!("Checking for messages...");
while let Some((topic, payload)) = tether_agent.check_messages() {
// debug!(
// "........ Received a message topic {:?} => topic parts {:?}",
// topic, topic
// );

if input_one.matches(&topic) {
info!(
"******** INPUT ONE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_one.name(),
topic,
payload.len()
);
// assert_eq!(parse_plug_name(topic.un), Some("one"));
if let Some(m) = receiver_of_numbers.parse(&topic, &payload) {
info!("Decoded a message for our 'numbers' Channel: {:?}", m);
}
if input_two.matches(&topic) {
if let Some(m) = receiver_of_custom_structs.parse(&topic, &payload) {
info!(
"******** INPUT TWO:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_two.name(),
topic,
payload.len()
);
// assert_eq!(parse_plug_name(message.topic()), Some("two"));
// assert_ne!(parse_plug_name(message.topic()), Some("one"));

// Notice how you must give the from_slice function a type so it knows what to expect
let decoded = from_slice::<CustomMessage>(&payload);
match decoded {
Ok(d) => {
info!("Yes, we decoded the MessagePack payload as: {:?}", d);
let CustomMessage { name, id } = d;
debug!("Name is {} and ID is {}", name, id);
}
Err(e) => {
warn!("Failed to decode the payload: {}", e)
}
};
}
if input_empty.matches(&topic) {
info!(
"******** EMPTY MESSAGE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_empty.name(),
topic,
payload.len()
);
// assert_eq!(parse_plug_name(topic), Some("nothing"));
}
if input_everything.matches(&topic) {
info!(
"******** EVERYTHING MATCHES HERE:\n Received a message for plug named \"{}\" on topic {:?} with length {} bytes",
input_everything.name(),
topic,
payload.len()
"Decoded a message for our 'custom structs' Channel: {:?}",
m
);
}
if input_specify_id.matches(&topic) {
info!("******** ID MATCH:\n Should match any role and plug name, but only messages with ID \"groupMessages\"");
info!(
"\n Received a message from plug named \"{}\" on topic {:?} with length {} bytes",
input_specify_id.name(),
topic,
payload.len()
);
// assert_eq!(parse_agent_id(message.topic()), Some("groupMessages"));
}
}

thread::sleep(Duration::from_millis(1000))
}
}
Loading
Loading