diff --git a/src/models/config/mod.rs b/src/models/config/mod.rs index 745a76d..0574f3d 100644 --- a/src/models/config/mod.rs +++ b/src/models/config/mod.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use serde::{Serialize, Deserialize}; use twilight_model::id::Id; use twilight_model::id::marker::{ApplicationMarker, GuildMarker}; diff --git a/src/server/guild/editing.rs b/src/server/guild/editing.rs index 07dcd0a..d8b2000 100644 --- a/src/server/guild/editing.rs +++ b/src/server/guild/editing.rs @@ -1,8 +1,9 @@ -use std::collections::{BTreeSet, HashMap}; +use std::collections::HashMap; use std::sync::Arc; +use json_patch::Patch; use mongodb::bson::doc; use mongodb::bson::oid::ObjectId; -use serde_json::{Map, Value}; +use serde::Serialize; use tokio::sync::{Mutex, RwLock}; use tracing::{error, warn}; use twilight_model::id::Id; @@ -11,18 +12,22 @@ use crate::context::Context; use crate::models::config::GuildConfig; use crate::server::guild::ws::{Connection, OutboundAction, OutboundMessage}; +#[derive(Clone, Debug, Serialize)] +pub struct Change { + pub author_id: Id, + pub changes: Patch +} + struct GuildEditingState { pub connections: Vec>, - pub changes: Value, - pub edited_by: BTreeSet> + pub changes: Vec, } impl Default for GuildEditingState { fn default() -> Self { Self { connections: vec![], - changes: Value::Object(Map::new()), - edited_by: Default::default(), + changes: vec![], } } } @@ -59,14 +64,16 @@ impl GuildsEditing { pub async fn marge_changes( &self, - author: Id, + author_id: Id, guild_id: Id, - changes: Value + changes: Patch ) -> Option<()> { let guild = self.get_guild(guild_id).await?; let mut guild_lock = guild.lock().await; - json_patch::merge(&mut guild_lock.changes, &changes); - guild_lock.edited_by.insert(author); + guild_lock.changes.push(Change { + author_id, + changes + }); Some(()) } @@ -75,7 +82,55 @@ impl GuildsEditing { list_lock.get(&guild_id).cloned() } - pub async fn broadcast_changes(&self, context: &Arc, guild_id: Id) -> Option<()> { + pub async fn broadcast_users(&self, guild_id: Id) -> Option<()> { + let guild = self.get_guild(guild_id).await?; + let guild_lock = guild.lock().await; + + let users = guild_lock.connections + .iter().map(|connection| connection.user_id) + .collect::>>(); + + for connection in &guild_lock.connections { + let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::OverwriteUsers(users.to_owned()))); + } + + Some(()) + } + + pub async fn broadcast_change( + &self, guild_id: Id, author_id: Id, changes: Patch + ) -> Option<()> { + let guild = self.get_guild(guild_id).await?; + let guild_lock = guild.lock().await; + + for connection in &guild_lock.connections { + let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::PushChange(Change { + author_id, + changes: changes.to_owned() + }))); + } + + Some(()) + } + + pub async fn get_initialization_data(&self, context: &Arc, guild_id: Id) + -> Option<(GuildConfig, Vec, Vec>)> { + let config = context.mongodb + .get_config(guild_id) + .await + .inspect_err(|error| error!(name: "mongodb error", ?error)) + .ok()?; + + let guild = self.get_guild(guild_id).await?; + let guild_lock = guild.lock().await; + let users = guild_lock.connections + .iter().map(|connection| connection.user_id) + .collect::>>(); + + Some((config.to_owned(), guild_lock.changes.to_owned(), users)) + } + + pub async fn broadcast_config_overwrite(&self, context: &Arc, guild_id: Id) -> Option<()> { let config = context.mongodb .get_config(guild_id) .await @@ -89,10 +144,9 @@ impl GuildsEditing { .collect::>>(); for connection in &guild_lock.connections { - let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::UpdateConfigurationData { + let _ = connection.tx.send(OutboundAction::Message(OutboundMessage::OverwriteConfigurationData { saved_config: config.to_owned(), changes: guild_lock.changes.to_owned(), - users: users.to_owned(), })); } @@ -112,9 +166,14 @@ impl GuildsEditing { let mut new_config = serde_json::to_value(config) .inspect_err(|error| error!(name: "cannot convert guild config to value", ?error)) .ok()?; - json_patch::merge(&mut new_config, &guild_lock.changes); + for patch in &guild_lock.changes { + json_patch::patch(&mut new_config, &patch.changes) + .inspect_err(|error| error!(name: "error applying patch to guild config", ?patch, ?error)) + .ok()?; + } + let new_config: GuildConfig = serde_json::from_value(new_config) - .inspect_err(|error| error!(name: "cannot marge edits with guild config", ?error)) + .inspect_err(|error| error!(name: "cannot serialize config after applying patches", ?error)) .ok()?; if new_config.guild_id != guild_id { @@ -138,8 +197,7 @@ impl GuildsEditing { .ok()?; context.mongodb.configs_cache.remove(&guild_id); - guild_lock.changes = Value::Object(Map::new()); - guild_lock.edited_by.clear(); + guild_lock.changes = vec![]; Some(()) } diff --git a/src/server/guild/ws.rs b/src/server/guild/ws.rs index 267be3c..e1c0b10 100644 --- a/src/server/guild/ws.rs +++ b/src/server/guild/ws.rs @@ -1,12 +1,12 @@ use std::borrow::Cow; use std::sync::Arc; use futures_util::{SinkExt, StreamExt}; +use json_patch::Patch; use mongodb::bson::oid::ObjectId; use serde::{Deserialize, Serialize}; -use serde_json::Value; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{error, info}; +use tracing::{error, info, warn}; use twilight_model::id::Id; use twilight_model::id::marker::UserMarker; use twilight_model::user::CurrentUserGuild; @@ -15,7 +15,7 @@ use crate::context::Context; use crate::database::redis::PartialGuild; use crate::models::config::GuildConfig; use crate::ok_or_return; -use crate::server::guild::editing::GuildsEditing; +use crate::server::guild::editing::{Change, GuildsEditing}; use crate::server::session::AuthorizationInformation; macro_rules! close { @@ -28,14 +28,17 @@ macro_rules! unwrap_or_close_and_return { ($target: expr, $tx: expr, $reason: expr) => { match $target { Ok(value) => value, - Err(_) => { - close!($tx, $reason); + Err(err) => { + let reason = $reason; + tracing::warn!(name: "connection closed due to error", ?err, ?reason); + close!($tx, reason); return } } }; } +#[derive(Debug)] pub enum CloseReason { MessageIsNotString, CannotParseJSON, @@ -97,26 +100,33 @@ pub async fn handle_connection( Message::close_with(reason.code(), reason.text()) ).await; guilds_editing.remove_connection(guild_id, session_id).await; - guilds_editing.broadcast_changes(&context, guild_id).await; + guilds_editing.broadcast_users(guild_id).await; } } } let _ = ws_tx.close().await; }); + + guilds_editing.broadcast_users(guild_id).await; + guilds_editing.add_connection(guild_id, Connection { user_id: info.user.id, session_id, tx: tx.to_owned(), }).await; - let _ = tx.send(OutboundAction::Message(OutboundMessage::Initialization { - cached: ok_or_return!(context.redis.get_guild(guild.id).await, Ok), - oauth2: guild.to_owned(), - session_id - })); - - guilds_editing.broadcast_changes(&context, guild_id).await; + if let Some((saved_config, changes, users)) = + guilds_editing.get_initialization_data(&context, guild_id).await { + let _ = tx.send(OutboundAction::Message(OutboundMessage::Initialization { + cached: ok_or_return!(context.redis.get_guild(guild.id).await, Ok), + oauth2: guild.to_owned(), + saved_config, + changes, + users, + session_id + })); + } while let Some(result) = ws_rx.next().await { let message = match result { @@ -136,12 +146,12 @@ pub async fn handle_connection( } guilds_editing.remove_connection(guild_id, session_id).await; - guilds_editing.broadcast_changes(&context, guild_id).await; + guilds_editing.broadcast_users(guild_id).await; } #[derive(Debug, Deserialize)] #[serde(tag = "action", content = "data")] enum InboundMessage { - GuildConfigUpdate(Value), + GuildConfigUpdate(Patch), ApplyChanges } @@ -151,13 +161,17 @@ pub enum OutboundMessage { Initialization { oauth2: CurrentUserGuild, cached: PartialGuild, - session_id: ObjectId - }, - UpdateConfigurationData { + session_id: ObjectId, saved_config: GuildConfig, - changes: Value, + changes: Vec, users: Vec> - } + }, + OverwriteConfigurationData { + saved_config: GuildConfig, + changes: Vec + }, + OverwriteUsers(Vec>), + PushChange(Change) } pub enum OutboundAction { @@ -183,8 +197,8 @@ async fn on_message( match message { InboundMessage::GuildConfigUpdate(changes) => { - let _ = guilds_editing.marge_changes(info.user.id, guild.id, changes).await; - let _ = guilds_editing.broadcast_changes(&context, guild.id).await; + let _ = guilds_editing.marge_changes(info.user.id, guild.id, changes.to_owned()).await; + let _ = guilds_editing.broadcast_change(guild.id, info.user.id, changes).await; } InboundMessage::ApplyChanges => { info!( @@ -193,7 +207,7 @@ async fn on_message( guild_id = %guild.id ); guilds_editing.apply_changes(&context, guild.id).await; - let _ = guilds_editing.broadcast_changes(&context, guild.id).await; + let _ = guilds_editing.broadcast_config_overwrite(&context, guild.id).await; let _ = context.redis.announce_config_update(guild.id).await .inspect_err(|error| { error!(name: "error sending guild_id to redis update announcer", ?error, %guild.id)