Skip to content

Conversation

@brian-arnold
Copy link
Collaborator

Here are two additional ways to specify initial input data from a DataFrame or a List. I tried to have the structure/code mimic that of GlobSource as closely as possible, but it sort of looks like code is being duplicated.

I'm also not sure if the hashing will work as intended? For instance, How do we know if the elements in the lists are Pathlike and the corresponding files should be hashed?

In any case, this code works on simple tests that I can formalize at some point.

@brian-arnold brian-arnold requested a review from eywalker July 1, 2025 00:04
Copy link
Contributor

@eywalker eywalker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a few minor changes requested -- otherwise it looks amazing!

raise ValueError(f"Columns not found in DataFrame: {missing_columns}")

if tag_function is None:
tag_function = self.__class__.default_tag_function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, it would make sense for the expected_tag_keys to be set to row_index

"It generates its own stream from the DataFrame."
)
# Claim uniqueness only if the default tag function is used
if self.tag_function == self.__class__.default_tag_function:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

# Convert DataFrame to hashable representation
df_subset = self.dataframe[self.columns]
df_content = df_subset.to_dict('records')
df_hashable = tuple(tuple(sorted(record.items())) for record in df_content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice -- amazing reproducibility on the data frame!

in the packet, with the corresponding row values as the packet values.
data : pd.DataFrame
The pandas DataFrame to source data from
tag_function : Callable[[pd.Series, int], Tag] | None, default=None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a feature where if a list of strings are passed, they are interpreted as columns whose values should be used as the tags?

self.expected_tag_keys = expected_tag_keys

if tag_function is None:
tag_function = self.__class__.default_tag_function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if using default tag function, let's let the expected_tag_keys be updated to element_index

def claims_unique_tags(
self, *streams: "SyncStream", trigger_run: bool = True
) -> bool | None:
if len(streams) != 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: probably we should extract the stream input check as a separate function as this check is repeated multiple times not only here but in many places throughout operators and sources. Perhaps pre-forward check should be formalized as a step.

@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants