diff --git a/deepola/Cargo.toml b/deepola/Cargo.toml index fd9ec41..2a43847 100644 --- a/deepola/Cargo.toml +++ b/deepola/Cargo.toml @@ -1,4 +1,5 @@ [workspace] members = [ "wake", + "prototype", ] diff --git a/deepola/prototype/Cargo.toml b/deepola/prototype/Cargo.toml new file mode 100644 index 0000000..bd8d637 --- /dev/null +++ b/deepola/prototype/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "prototype" +version = "0.1.0" +edition = "2021" +autoexamples = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +concurrent-queue = "1.2.2" +csv = "1.1" +env_logger = "0.9.0" +fixed-vec-deque = "0.1.9" +generator = "0.6" +getset = "0.1.2" +itertools = "0.10" +log = "0.4.14" +nanoid = "0.4.0" +quick-error = "2.0.1" +rand = "0.8.5" +rayon = "1.5.2" +rustc-hash = "1.1.0" +simple-error = "0.2.3" +structopt = "0.3.26" +uuid = { version = "0.8", features = ["v4"] } +jemallocator = "0.3.2" +glob = "0.3.0" +alphanumeric-sort = "1.4.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.85" +statrs = "0.16.0" + +[dev-dependencies] +ctor = "0.1.21" +env_logger = "0.9.0" +criterion = "0.3.5" +lazy_static = "1.4.0" +regex = "1.6.0" diff --git a/deepola/prototype/src/lib.rs b/deepola/prototype/src/lib.rs new file mode 100644 index 0000000..f656a94 --- /dev/null +++ b/deepola/prototype/src/lib.rs @@ -0,0 +1,5 @@ +mod node; +mod operations; + +pub use node::*; +pub use operations::*; \ No newline at end of file diff --git a/deepola/prototype/src/node.rs b/deepola/prototype/src/node.rs new file mode 100644 index 0000000..18df16f --- /dev/null +++ b/deepola/prototype/src/node.rs @@ -0,0 +1,136 @@ +use std::{sync::{Arc, RwLock}, cell::RefCell}; +use getset::{Getters,Setters}; + +use crate::BufferedProcessor; + +#[derive(Getters, Setters)] +pub struct ExecutionNode { + parents: Vec>>, + operation: Arc>>, +} + +unsafe impl Send for ExecutionNode {} +unsafe impl Sync for ExecutionNode {} + +impl ExecutionNode { + pub fn new(operation: Box) -> Arc> { + let execution_node = Arc::new(RwLock::new( + ExecutionNode { + parents: vec![], + operation: Arc::new(RefCell::new(operation)), + })); + execution_node.as_ref().read().unwrap().operation.as_ref().borrow_mut().set_node(Arc::downgrade(&execution_node)); + execution_node + } + + pub fn get(&mut self) -> Option { + todo!("User facing interface that allows to obtain the result for next partition"); + } + + pub fn run(&mut self) { + todo!("Run execution of all available partitions"); + } + + // Node obtains ith output operation by invoking the operation's get_output_partition. + pub fn get_output_partition(&self, partition_num: usize) -> Option { + let result = self.operation.as_ref().borrow_mut().get_output_partition(partition_num); + result + } + + // Interface to obtain `partition_num` partition from the parent node at `seq_no`. + // The operation panics if no such parent node is available else returns the partition. + pub fn get_input_partition(&self, seq_no: usize, partition_num: usize) -> Option { + if seq_no >= self.parents.len() { + panic!("No parent node at seq_no: {}", seq_no); + } else { + let parent_node_rc = self.parents.get(seq_no).unwrap(); + let result = parent_node_rc.as_ref().read().unwrap().get_output_partition(partition_num); + result + } + } + + // Connect nodes through parent pointers. + pub fn subscribe_to_node(&mut self, parent: &Arc>) { + self.parents.push(parent.clone()); + } + + pub fn get_all_results(&self) -> Vec { + let mut count = 0; + let mut output = vec![]; + loop { + let result = self.get_output_partition(count); + match result { + Some(x) => {output.push(x); count += 1;}, + None => {break;} + } + } + output + } + +} + +#[cfg(test)] +mod tests { + use std::thread; + + use crate::*; + + #[test] + fn test_single_mapper_node() { + let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; + + // Creating Operations + let reader = Box::new(Reader::new(input_data)); + let add = Box::new(Mapper::new(0, MapperOp::Add)); + + // Creating Nodes from these operations + let reader_node = ExecutionNode::new(reader); + let add_node = ExecutionNode::new(add); + + // Connecting the created nodes. + add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); + + // Obtain the result from the final node in the execution graph one by one. + let mut count = 0; + loop { + let result = add_node.as_ref().read().unwrap().get_output_partition(count); + match result { + Some(x) => { assert_eq!(x, *expected_add_result.get(count).unwrap()); }, + None => { break; } + } + count += 1; + } + } + + #[test] + fn test_multiple_nodes_in_parallel() { + let input_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let expected_add_result = vec![1, 3, 6, 10, 15, 21, 28, 36, 45, 55]; + let expected_mul_result = vec![1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800]; + + let reader = Box::new(Reader::new(input_data)); + let add = Box::new(Mapper::new(0, MapperOp::Add)); + let mul = Box::new(Mapper::new(1, MapperOp::Mul)); + + let reader_node = ExecutionNode::new(reader); + let add_node = ExecutionNode::new(add); + let mul_node = ExecutionNode::new(mul); + + // Connect the nodes. write() since mut. + add_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); + mul_node.as_ref().write().unwrap().subscribe_to_node(&reader_node); + + let handle_1 = thread::spawn(move || { + add_node.as_ref().read().unwrap().get_all_results() + }); + + let handle_2 = thread::spawn(move || { + mul_node.as_ref().read().unwrap().get_all_results() + }); + + assert_eq!(handle_1.join().unwrap(), expected_add_result); + assert_eq!(handle_2.join().unwrap(), expected_mul_result); + } + +} \ No newline at end of file diff --git a/deepola/prototype/src/operations.rs b/deepola/prototype/src/operations.rs new file mode 100644 index 0000000..b5194ac --- /dev/null +++ b/deepola/prototype/src/operations.rs @@ -0,0 +1,104 @@ +use std::sync::{Weak, RwLock}; +use std::convert::AsRef; + +use getset::{Getters, Setters}; + +use crate::ExecutionNode; + +pub trait BufferedProcessor { + fn map(&mut self, input: i64) -> i64 { input } + + fn get_output_partition(&mut self, partition_num: usize) -> Option; + + fn set_node(&mut self, node: Weak>); +} + +// Concrete operations that implement the BufferedProcessor trait. +// Operation 1: Reader [] => This is a generator for input data. +#[derive(Getters, Setters)] +pub struct Reader { + node: Option>>, + input_data: Vec, +} + +impl Reader { + pub fn new(input_data: Vec) -> Self { + Reader { + node: None, + input_data + } + } +} + +impl BufferedProcessor for Reader { + fn get_output_partition(&mut self, partition_num: usize) -> Option { + // This operation takes the ith input_data (or file_name). + // Returns the value read (or dataframe). + let value = self.input_data.get(partition_num); + if let Some(x) = value { + Some(*x) + } else { + None + } + } + + fn map(&mut self, input: i64) -> i64 { + input + } + + // Need to have this operation because `dyn BufferedProcessor` cannot do .node = <> + fn set_node(&mut self, node: Weak>) { + self.node = Some(node); + } +} + +pub enum MapperOp { + Add, + Mul +} + +// Operation 2: Appender [] => Can have custom map implementations. +#[derive(Getters, Setters)] +pub struct Mapper { + node: Option>>, + acc: i64, + op: MapperOp, +} + +impl Mapper { + pub fn new(acc: i64, op : MapperOp) -> Self { + Mapper { + node: None, + acc, + op, + } + } +} + +impl BufferedProcessor for Mapper { + fn get_output_partition(&mut self, partition_num: usize) -> Option { + match &self.node { + Some(node) => { + let execution_node = node.upgrade().unwrap(); + let input_partition = execution_node.as_ref().read().unwrap().get_input_partition(0, partition_num); + match input_partition { + Some(a) => Some(self.map(a)), + None => None + } + }, + None => panic!("ExecutionNode not created!") + } + } + + fn map(&mut self, input: i64) -> i64 { + match self.op { + MapperOp::Add => { self.acc += input; }, + MapperOp::Mul => { self.acc *= input; }, + } + self.acc + } + + fn set_node(&mut self, node: Weak>) { + self.node = Some(node); + } +}