From 97fce254010005d22a4da7c70511399c4ead8694 Mon Sep 17 00:00:00 2001 From: Nikhil Sheoran Date: Wed, 7 Dec 2022 14:19:42 -0600 Subject: [PATCH 1/3] Add initial version. Calls function of parent node to obtain dataframe --- deepola/Cargo.toml | 1 + deepola/prototype/Cargo.toml | 38 +++++++ deepola/prototype/src/lib.rs | 3 + deepola/prototype/src/node.rs | 183 ++++++++++++++++++++++++++++++++++ 4 files changed, 225 insertions(+) create mode 100644 deepola/prototype/Cargo.toml create mode 100644 deepola/prototype/src/lib.rs create mode 100644 deepola/prototype/src/node.rs diff --git a/deepola/Cargo.toml b/deepola/Cargo.toml index fd9ec41..2a43847 100644 --- a/deepola/Cargo.toml +++ b/deepola/Cargo.toml @@ -1,4 +1,5 @@ [workspace] members = [ "wake", + "prototype", ] diff --git a/deepola/prototype/Cargo.toml b/deepola/prototype/Cargo.toml new file mode 100644 index 0000000..bd8d637 --- /dev/null +++ b/deepola/prototype/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "prototype" +version = "0.1.0" +edition = "2021" +autoexamples = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +concurrent-queue = "1.2.2" +csv = "1.1" +env_logger = "0.9.0" +fixed-vec-deque = "0.1.9" +generator = "0.6" +getset = "0.1.2" +itertools = "0.10" +log = "0.4.14" +nanoid = "0.4.0" +quick-error = "2.0.1" +rand = "0.8.5" +rayon = "1.5.2" +rustc-hash = "1.1.0" +simple-error = "0.2.3" +structopt = "0.3.26" +uuid = { version = "0.8", features = ["v4"] } +jemallocator = "0.3.2" +glob = "0.3.0" +alphanumeric-sort = "1.4.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.85" +statrs = "0.16.0" + +[dev-dependencies] +ctor = "0.1.21" +env_logger = "0.9.0" +criterion = "0.3.5" +lazy_static = "1.4.0" +regex = "1.6.0" diff --git a/deepola/prototype/src/lib.rs b/deepola/prototype/src/lib.rs new file mode 100644 index 0000000..f9caa99 --- /dev/null +++ b/deepola/prototype/src/lib.rs @@ -0,0 +1,3 @@ +mod node; + +pub use node::*; \ No newline at end of file diff --git a/deepola/prototype/src/node.rs b/deepola/prototype/src/node.rs new file mode 100644 index 0000000..5bb51ae --- /dev/null +++ b/deepola/prototype/src/node.rs @@ -0,0 +1,183 @@ +use std::{cell::{RefCell, Ref}, rc::{Rc, Weak}, borrow::{Borrow, BorrowMut}}; +use std::convert::AsRef; +use getset::{Getters,Setters}; + +// pub struct ExecutionService { +// count: usize, +// nodes: Vec>>, +// } + +pub trait BufferedProcessor { + fn map(&mut self, input: i64) -> i64 { input } + + fn get_output_partition(&mut self, partition_num: usize) -> Option; + + fn set_node(&mut self, node: Weak>); +} + +#[derive(Getters, Setters)] +pub struct ExecutionNode { + node_id: usize, + parents: Vec>>, + progress: usize, + operation: Rc>>, +} + +impl ExecutionNode { + // fn new(operation: Box) -> Rc> { + // let execution_node = Rc::new(RefCell::new( + // ExecutionNode { + // node_id: 0, + // parents: vec![], + // operation: Rc::clone(&Rc::new(RefCell::new(operation))), + // } + // )); + // operation.as_ref().borrow_mut().set_node(Rc::downgrade(&execution_node)); + // execution_node + // } + + // fn get(&mut self, progress) -> Option { + // let next_result = 1 + self.progress; + // let result = self.get_output_partition(next_result); + // self.progress += 1; + // result + // } + + fn new(operation: Box) -> Rc> { + let execution_node = Rc::new(RefCell::new( + ExecutionNode { + node_id: 0, + parents: vec![], + progress: 0, + operation: Rc::clone(&Rc::new(RefCell::new(operation))), + })); + execution_node.as_ref().borrow().operation.as_ref().borrow_mut().set_node(Rc::downgrade(&execution_node)); + execution_node + } + + fn run(&mut self) { + println!("Starting Execution"); + } + + fn get_output_partition(&self, partition_num: usize) -> Option { + let result = self.operation.as_ref().borrow_mut().get_output_partition(partition_num); + result + } + + fn get_input_partition(&self, seq_no: usize, partition_num: usize) -> Option { + println!("Getting Input from seq_no: {} when parents are: {}", seq_no, self.parents.len()); + if seq_no >= self.parents.len() { + None + } else { + let parent_node_rc = self.parents.get(seq_no).unwrap(); + let result = parent_node_rc.as_ref().borrow().get_output_partition(partition_num); + result + } + } + + fn subscribe_to_node(&mut self, parent: &Rc>) { + self.parents.push(parent.clone()); + } + +} + +// Concrete operations that implement the BufferedProcessor trait. + +// Operation 1: Reader [] => This is a generator for input data. +pub struct Reader { + node: Option>>, + input_data: Vec, +} +impl BufferedProcessor for Reader { + fn get_output_partition(&mut self, partition_num: usize) -> Option { + println!("GET OUTPUT PARTITION CALLED FOR READER"); + let value = self.input_data.get(partition_num); + if let Some(x) = value { + Some(*x) + } else { + None + } + } + + fn map(&mut self, input: i64) -> i64 { + input + } + + fn set_node(&mut self, node: Weak>) { + self.node = Some(node); + } +} + +pub enum MapperOp { + Add, + Mul +} + +// Operation 2: Appender [] => Can have custom map implementations. +pub struct Mapper { + node: Option>>, + acc: i64, + op: MapperOp, +} +impl BufferedProcessor for Mapper { + fn get_output_partition(&mut self, partition_num: usize) -> Option { + println!("GET OUTPUT PARTITION CALLED FOR MAPPER"); + match &self.node { + Some(node) => { + let execution_node = node.upgrade().unwrap(); + let input_partition = execution_node.as_ref().borrow().get_input_partition(0, partition_num); + match input_partition { + Some(a) => Some(self.map(a)), + None => None + } + }, + None => panic!("ExecutionNode not created!") + } + } + + fn map(&mut self, input: i64) -> i64 { + match self.op { + MapperOp::Add => { self.acc += input; }, + MapperOp::Mul => { self.acc *= input; }, + } + self.acc + } + + fn set_node(&mut self, node: Weak>) { + self.node = Some(node); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_node_creation() { + println!("Test Runs"); + let reader = Box::new(Reader { node: None, input_data : vec![1,2,3,4,5,6,7,8,9,10]}); + let adder = Box::new(Mapper { node: None, acc: 0, op: MapperOp::Add}); + let multiplier = Box::new(Mapper { node: None, acc: 1, op: MapperOp::Mul}); + + let reader_node = ExecutionNode::new(reader); // Specify lazy mode + let adder_node = ExecutionNode::new(adder); + let multiplier_node = ExecutionNode::new(multiplier); // Specify lazy mode + + adder_node.as_ref().borrow_mut().subscribe_to_node(&reader_node); + multiplier_node.as_ref().borrow_mut().subscribe_to_node(&reader_node); + + println!("Nodes Created. Subscriptions Done."); + + let mut count = 0; + loop { + let result = adder_node.as_ref().borrow().get_output_partition(count); + // let result = multiplier_node.as_ref().borrow().get_output_partition(count); + match result { + Some(x) => { println!("Result: {:?}", x); }, + None => { break; } + } + count += 1; + } + } + +} \ No newline at end of file From de623cfe4598a4daa23157d6614775398af4ce37 Mon Sep 17 00:00:00 2001 From: Nikhil Sheoran Date: Wed, 7 Dec 2022 16:15:42 -0600 Subject: [PATCH 2/3] Re-factor and add comments --- deepola/prototype/src/lib.rs | 4 +- deepola/prototype/src/node.rs | 179 ++++++++-------------------- deepola/prototype/src/operations.rs | 105 ++++++++++++++++ 3 files changed, 160 insertions(+), 128 deletions(-) create mode 100644 deepola/prototype/src/operations.rs diff --git a/deepola/prototype/src/lib.rs b/deepola/prototype/src/lib.rs index f9caa99..f656a94 100644 --- a/deepola/prototype/src/lib.rs +++ b/deepola/prototype/src/lib.rs @@ -1,3 +1,5 @@ mod node; +mod operations; -pub use node::*; \ No newline at end of file +pub use node::*; +pub use operations::*; \ No newline at end of file diff --git a/deepola/prototype/src/node.rs b/deepola/prototype/src/node.rs index 5bb51ae..8ce5bf8 100644 --- a/deepola/prototype/src/node.rs +++ b/deepola/prototype/src/node.rs @@ -1,73 +1,51 @@ -use std::{cell::{RefCell, Ref}, rc::{Rc, Weak}, borrow::{Borrow, BorrowMut}}; -use std::convert::AsRef; +use std::{cell::RefCell, sync::Arc}; use getset::{Getters,Setters}; -// pub struct ExecutionService { -// count: usize, -// nodes: Vec>>, -// } - -pub trait BufferedProcessor { - fn map(&mut self, input: i64) -> i64 { input } - - fn get_output_partition(&mut self, partition_num: usize) -> Option; - - fn set_node(&mut self, node: Weak>); -} +use crate::BufferedProcessor; #[derive(Getters, Setters)] pub struct ExecutionNode { node_id: usize, - parents: Vec>>, + parents: Vec>>, progress: usize, - operation: Rc>>, + operation: Arc>>, } +unsafe impl Send for ExecutionNode {} +unsafe impl Sync for ExecutionNode {} + impl ExecutionNode { - // fn new(operation: Box) -> Rc> { - // let execution_node = Rc::new(RefCell::new( - // ExecutionNode { - // node_id: 0, - // parents: vec![], - // operation: Rc::clone(&Rc::new(RefCell::new(operation))), - // } - // )); - // operation.as_ref().borrow_mut().set_node(Rc::downgrade(&execution_node)); - // execution_node - // } - - // fn get(&mut self, progress) -> Option { - // let next_result = 1 + self.progress; - // let result = self.get_output_partition(next_result); - // self.progress += 1; - // result - // } - - fn new(operation: Box) -> Rc> { - let execution_node = Rc::new(RefCell::new( + pub fn new(operation: Box) -> Arc> { + let execution_node = Arc::new(RefCell::new( ExecutionNode { - node_id: 0, - parents: vec![], - progress: 0, - operation: Rc::clone(&Rc::new(RefCell::new(operation))), + node_id: 0, + parents: vec![], + progress: 0, + operation: Arc::new(RefCell::new(operation)), })); - execution_node.as_ref().borrow().operation.as_ref().borrow_mut().set_node(Rc::downgrade(&execution_node)); + execution_node.as_ref().borrow().operation.as_ref().borrow_mut().set_node(Arc::downgrade(&execution_node)); execution_node } - fn run(&mut self) { - println!("Starting Execution"); + pub fn get(&mut self) -> Option { + todo!("User facing interface that allows to obtain the result for next partition"); } - fn get_output_partition(&self, partition_num: usize) -> Option { + pub fn run(&mut self) { + todo!("Run execution of all available partitions"); + } + + // Node obtains ith output operation by invoking the operation's get_output_partition. + pub fn get_output_partition(&self, partition_num: usize) -> Option { let result = self.operation.as_ref().borrow_mut().get_output_partition(partition_num); result } - fn get_input_partition(&self, seq_no: usize, partition_num: usize) -> Option { - println!("Getting Input from seq_no: {} when parents are: {}", seq_no, self.parents.len()); + // Interface to obtain `partition_num` partition from the parent node at `seq_no`. + // The operation panics if no such parent node is available else returns the partition. + pub fn get_input_partition(&self, seq_no: usize, partition_num: usize) -> Option { if seq_no >= self.parents.len() { - None + panic!("No parent node at seq_no: {}", seq_no); } else { let parent_node_rc = self.parents.get(seq_no).unwrap(); let result = parent_node_rc.as_ref().borrow().get_output_partition(partition_num); @@ -75,109 +53,56 @@ impl ExecutionNode { } } - fn subscribe_to_node(&mut self, parent: &Rc>) { + // Connect nodes through parent pointers. + pub fn subscribe_to_node(&mut self, parent: &Arc>) { self.parents.push(parent.clone()); } -} - -// Concrete operations that implement the BufferedProcessor trait. - -// Operation 1: Reader [] => This is a generator for input data. -pub struct Reader { - node: Option>>, - input_data: Vec, -} -impl BufferedProcessor for Reader { - fn get_output_partition(&mut self, partition_num: usize) -> Option { - println!("GET OUTPUT PARTITION CALLED FOR READER"); - let value = self.input_data.get(partition_num); - if let Some(x) = value { - Some(*x) - } else { - None - } - } - - fn map(&mut self, input: i64) -> i64 { - input - } - - fn set_node(&mut self, node: Weak>) { - self.node = Some(node); - } -} - -pub enum MapperOp { - Add, - Mul -} - -// Operation 2: Appender [] => Can have custom map implementations. -pub struct Mapper { - node: Option>>, - acc: i64, - op: MapperOp, -} -impl BufferedProcessor for Mapper { - fn get_output_partition(&mut self, partition_num: usize) -> Option { - println!("GET OUTPUT PARTITION CALLED FOR MAPPER"); - match &self.node { - Some(node) => { - let execution_node = node.upgrade().unwrap(); - let input_partition = execution_node.as_ref().borrow().get_input_partition(0, partition_num); - match input_partition { - Some(a) => Some(self.map(a)), - None => None - } - }, - None => panic!("ExecutionNode not created!") - } - } - - fn map(&mut self, input: i64) -> i64 { - match self.op { - MapperOp::Add => { self.acc += input; }, - MapperOp::Mul => { self.acc *= input; }, + pub fn get_all_results(&self) -> Vec { + let mut count = 0; + let mut output = vec![]; + loop { + let result = self.get_output_partition(count); + println!("Node: {}, Result: {:?}", self.node_id, result); + match result { + Some(x) => {output.push(x); count += 1;}, + None => {break;} + } } - self.acc + output } - fn set_node(&mut self, node: Weak>) { - self.node = Some(node); - } } #[cfg(test)] mod tests { - use super::*; + use std::thread; + + use crate::*; #[test] - fn test_node_creation() { - println!("Test Runs"); - let reader = Box::new(Reader { node: None, input_data : vec![1,2,3,4,5,6,7,8,9,10]}); - let adder = Box::new(Mapper { node: None, acc: 0, op: MapperOp::Add}); - let multiplier = Box::new(Mapper { node: None, acc: 1, op: MapperOp::Mul}); + fn test_single_mapper_node() { + // Creating Operations + let reader = Box::new(Reader::new(vec![1,2,3,4,5,6,7,8,9,10])); + let adder = Box::new(Mapper::new(0, MapperOp::Add)); - let reader_node = ExecutionNode::new(reader); // Specify lazy mode + // Creating Nodes from these operations + let reader_node = ExecutionNode::new(reader); let adder_node = ExecutionNode::new(adder); - let multiplier_node = ExecutionNode::new(multiplier); // Specify lazy mode + // Connecting the created nodes. adder_node.as_ref().borrow_mut().subscribe_to_node(&reader_node); - multiplier_node.as_ref().borrow_mut().subscribe_to_node(&reader_node); - - println!("Nodes Created. Subscriptions Done."); + // Obtain the result from the final node in the execution graph. + let expected_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; let mut count = 0; loop { let result = adder_node.as_ref().borrow().get_output_partition(count); - // let result = multiplier_node.as_ref().borrow().get_output_partition(count); match result { - Some(x) => { println!("Result: {:?}", x); }, + Some(x) => { assert_eq!(x, *expected_result.get(count).unwrap()); }, None => { break; } } count += 1; } } - } \ No newline at end of file diff --git a/deepola/prototype/src/operations.rs b/deepola/prototype/src/operations.rs new file mode 100644 index 0000000..5677f1d --- /dev/null +++ b/deepola/prototype/src/operations.rs @@ -0,0 +1,105 @@ +use std::sync::Weak; +use std::cell::RefCell; +use std::convert::AsRef; + +use getset::{Getters, Setters}; + +use crate::ExecutionNode; + +pub trait BufferedProcessor { + fn map(&mut self, input: i64) -> i64 { input } + + fn get_output_partition(&mut self, partition_num: usize) -> Option; + + fn set_node(&mut self, node: Weak>); +} + +// Concrete operations that implement the BufferedProcessor trait. +// Operation 1: Reader [] => This is a generator for input data. +#[derive(Getters, Setters)] +pub struct Reader { + node: Option>>, + input_data: Vec, +} + +impl Reader { + pub fn new(input_data: Vec) -> Self { + Reader { + node: None, + input_data + } + } +} + +impl BufferedProcessor for Reader { + fn get_output_partition(&mut self, partition_num: usize) -> Option { + // This operation takes the ith input_data (or file_name). + // Returns the value read (or dataframe). + let value = self.input_data.get(partition_num); + if let Some(x) = value { + Some(*x) + } else { + None + } + } + + fn map(&mut self, input: i64) -> i64 { + input + } + + // Need to have this operation because `dyn BufferedProcessor` cannot do .node = <> + fn set_node(&mut self, node: Weak>) { + self.node = Some(node); + } +} + +pub enum MapperOp { + Add, + Mul +} + +// Operation 2: Appender [] => Can have custom map implementations. +#[derive(Getters, Setters)] +pub struct Mapper { + node: Option>>, + acc: i64, + op: MapperOp, +} + +impl Mapper { + pub fn new(acc: i64, op : MapperOp) -> Self { + Mapper { + node: None, + acc, + op, + } + } +} + +impl BufferedProcessor for Mapper { + fn get_output_partition(&mut self, partition_num: usize) -> Option { + match &self.node { + Some(node) => { + let execution_node = node.upgrade().unwrap(); + let input_partition = execution_node.as_ref().borrow().get_input_partition(0, partition_num); + match input_partition { + Some(a) => Some(self.map(a)), + None => None + } + }, + None => panic!("ExecutionNode not created!") + } + } + + fn map(&mut self, input: i64) -> i64 { + match self.op { + MapperOp::Add => { self.acc += input; }, + MapperOp::Mul => { self.acc *= input; }, + } + self.acc + } + + fn set_node(&mut self, node: Weak>) { + self.node = Some(node); + } +} From 24e8a11917387f262cbb10ba5fd57f29e46b674b Mon Sep 17 00:00:00 2001 From: Nikhil Sheoran Date: Wed, 7 Dec 2022 16:32:15 -0600 Subject: [PATCH 3/3] Change RefCell to RwLock and perform operations in parallel --- deepola/prototype/src/node.rs | 68 ++++++++++++++++++++--------- deepola/prototype/src/operations.rs | 15 +++---- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/deepola/prototype/src/node.rs b/deepola/prototype/src/node.rs index 8ce5bf8..18df16f 100644 --- a/deepola/prototype/src/node.rs +++ b/deepola/prototype/src/node.rs @@ -1,13 +1,11 @@ -use std::{cell::RefCell, sync::Arc}; +use std::{sync::{Arc, RwLock}, cell::RefCell}; use getset::{Getters,Setters}; use crate::BufferedProcessor; #[derive(Getters, Setters)] pub struct ExecutionNode { - node_id: usize, - parents: Vec>>, - progress: usize, + parents: Vec>>, operation: Arc>>, } @@ -15,15 +13,13 @@ unsafe impl Send for ExecutionNode {} unsafe impl Sync for ExecutionNode {} impl ExecutionNode { - pub fn new(operation: Box) -> Arc> { - let execution_node = Arc::new(RefCell::new( + pub fn new(operation: Box) -> Arc> { + let execution_node = Arc::new(RwLock::new( ExecutionNode { - node_id: 0, parents: vec![], - progress: 0, operation: Arc::new(RefCell::new(operation)), })); - execution_node.as_ref().borrow().operation.as_ref().borrow_mut().set_node(Arc::downgrade(&execution_node)); + execution_node.as_ref().read().unwrap().operation.as_ref().borrow_mut().set_node(Arc::downgrade(&execution_node)); execution_node } @@ -48,13 +44,13 @@ impl ExecutionNode { panic!("No parent node at seq_no: {}", seq_no); } else { let parent_node_rc = self.parents.get(seq_no).unwrap(); - let result = parent_node_rc.as_ref().borrow().get_output_partition(partition_num); + let result = parent_node_rc.as_ref().read().unwrap().get_output_partition(partition_num); result } } // Connect nodes through parent pointers. - pub fn subscribe_to_node(&mut self, parent: &Arc>) { + pub fn subscribe_to_node(&mut self, parent: &Arc>) { self.parents.push(parent.clone()); } @@ -63,7 +59,6 @@ impl ExecutionNode { let mut output = vec![]; loop { let result = self.get_output_partition(count); - println!("Node: {}, Result: {:?}", self.node_id, result); match result { Some(x) => {output.push(x); count += 1;}, None => {break;} @@ -82,27 +77,60 @@ mod tests { #[test] fn test_single_mapper_node() { + let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; + // Creating Operations - let reader = Box::new(Reader::new(vec![1,2,3,4,5,6,7,8,9,10])); - let adder = Box::new(Mapper::new(0, MapperOp::Add)); + let reader = Box::new(Reader::new(input_data)); + let add = Box::new(Mapper::new(0, MapperOp::Add)); // Creating Nodes from these operations let reader_node = ExecutionNode::new(reader); - let adder_node = ExecutionNode::new(adder); + let add_node = ExecutionNode::new(add); // Connecting the created nodes. - adder_node.as_ref().borrow_mut().subscribe_to_node(&reader_node); + add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); - // Obtain the result from the final node in the execution graph. - let expected_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; + // Obtain the result from the final node in the execution graph one by one. let mut count = 0; loop { - let result = adder_node.as_ref().borrow().get_output_partition(count); + let result = add_node.as_ref().read().unwrap().get_output_partition(count); match result { - Some(x) => { assert_eq!(x, *expected_result.get(count).unwrap()); }, + Some(x) => { assert_eq!(x, *expected_add_result.get(count).unwrap()); }, None => { break; } } count += 1; } } + + #[test] + fn test_multiple_nodes_in_parallel() { + let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; + let expected_mul_result = vec![1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800]; + + let reader = Box::new(Reader::new(input_data)); + let add = Box::new(Mapper::new(0, MapperOp::Add)); + let mul = Box::new(Mapper::new(1, MapperOp::Mul)); + + let reader_node = ExecutionNode::new(reader); + let add_node = ExecutionNode::new(add); + let mul_node = ExecutionNode::new(mul); + + // Connect the nodes. write() since mut. + add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); + mul_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); + + let handle_1 = thread::spawn(move || { + add_node.as_ref().read().unwrap().get_all_results() + }); + + let handle_2 = thread::spawn(move || { + mul_node.as_ref().read().unwrap().get_all_results() + }); + + assert_eq!(handle_1.join().unwrap(), expected_add_result); + assert_eq!(handle_2.join().unwrap(), expected_mul_result); + } + } \ No newline at end of file diff --git a/deepola/prototype/src/operations.rs b/deepola/prototype/src/operations.rs index 5677f1d..b5194ac 100644 --- a/deepola/prototype/src/operations.rs +++ b/deepola/prototype/src/operations.rs @@ -1,5 +1,4 @@ -use std::sync::Weak; -use std::cell::RefCell; +use std::sync::{Weak, RwLock}; use std::convert::AsRef; use getset::{Getters, Setters}; @@ -11,14 +10,14 @@ pub trait BufferedProcessor { fn get_output_partition(&mut self, partition_num: usize) -> Option; - fn set_node(&mut self, node: Weak>); + fn set_node(&mut self, node: Weak>); } // Concrete operations that implement the BufferedProcessor trait. // Operation 1: Reader [] => This is a generator for input data. #[derive(Getters, Setters)] pub struct Reader { - node: Option>>, + node: Option>>, input_data: Vec, } @@ -48,7 +47,7 @@ impl BufferedProcessor for Reader { } // Need to have this operation because `dyn BufferedProcessor` cannot do .node = <> - fn set_node(&mut self, node: Weak>) { + fn set_node(&mut self, node: Weak>) { self.node = Some(node); } } @@ -61,7 +60,7 @@ pub enum MapperOp { // Operation 2: Appender [] => Can have custom map implementations. #[derive(Getters, Setters)] pub struct Mapper { - node: Option>>, + node: Option>>, acc: i64, op: MapperOp, } @@ -81,7 +80,7 @@ impl BufferedProcessor for Mapper { match &self.node { Some(node) => { let execution_node = node.upgrade().unwrap(); - let input_partition = execution_node.as_ref().borrow().get_input_partition(0, partition_num); + let input_partition = execution_node.as_ref().read().unwrap().get_input_partition(0, partition_num); match input_partition { Some(a) => Some(self.map(a)), None => None @@ -99,7 +98,7 @@ impl BufferedProcessor for Mapper { self.acc } - fn set_node(&mut self, node: Weak>) { + fn set_node(&mut self, node: Weak>) { self.node = Some(node); } }