-
Notifications
You must be signed in to change notification settings - Fork 5
Add source from dataframe or list #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…separator in hash
eywalker
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a few minor changes requested -- otherwise it looks amazing!
| raise ValueError(f"Columns not found in DataFrame: {missing_columns}") | ||
|
|
||
| if tag_function is None: | ||
| tag_function = self.__class__.default_tag_function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case, it would make sense for the expected_tag_keys to be set to row_index
| "It generates its own stream from the DataFrame." | ||
| ) | ||
| # Claim uniqueness only if the default tag function is used | ||
| if self.tag_function == self.__class__.default_tag_function: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome!
| # Convert DataFrame to hashable representation | ||
| df_subset = self.dataframe[self.columns] | ||
| df_content = df_subset.to_dict('records') | ||
| df_hashable = tuple(tuple(sorted(record.items())) for record in df_content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice -- amazing reproducibility on the data frame!
| in the packet, with the corresponding row values as the packet values. | ||
| data : pd.DataFrame | ||
| The pandas DataFrame to source data from | ||
| tag_function : Callable[[pd.Series, int], Tag] | None, default=None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind adding a feature where if a list of strings are passed, they are interpreted as columns whose values should be used as the tags?
| self.expected_tag_keys = expected_tag_keys | ||
|
|
||
| if tag_function is None: | ||
| tag_function = self.__class__.default_tag_function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if using default tag function, let's let the expected_tag_keys be updated to element_index
| def claims_unique_tags( | ||
| self, *streams: "SyncStream", trigger_run: bool = True | ||
| ) -> bool | None: | ||
| if len(streams) != 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: probably we should extract the stream input check as a separate function as this check is repeated multiple times not only here but in many places throughout operators and sources. Perhaps pre-forward check should be formalized as a step.
Add working pipeline system implementation
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
Here are two additional ways to specify initial input data from a DataFrame or a List. I tried to have the structure/code mimic that of GlobSource as closely as possible, but it sort of looks like code is being duplicated.
I'm also not sure if the hashing will work as intended? For instance, How do we know if the elements in the lists are Pathlike and the corresponding files should be hashed?
In any case, this code works on simple tests that I can formalize at some point.