Skip to content

Conversation

@srawat98-dev
Copy link
Contributor

Summary

I extended the existing TableStatsCollectionSparkApp to implement the logic for populating the OpenhouseCommitEventTablePartitionStats table.

This new table will serve as the partition-level source of truth for statistics and commit metadata across all OpenHouse datasets. The table contains exactly one row per partition, where the commit metadata reflects the latest commit that modified that partition. Each record includes:

  1. Commit Metadata (from the latest commit that changed the respective partition)
    • Commit ID (snapshot_id)
    • Commit timestamp (committed_at)
    • Commit App Id (spark app id)
    • Commit App Name (spark app name)
    • Commit operation (APPEND, DELETE, OVERWRITE, REPLACE)
  2. Table Identifier (database, table, cluster, location, partition spec)
  3. Partition Data (typed column values for all partition columns)
  4. Table Level Stats (rowCount, columnCount)
  5. Field/Column Level Stats (nullCount, nanCount, minValue, maxValue, columnSizeInBytes)

This enables granular partition-level analytics and monitoring, providing:

  1. Partition-level statistics - Access detailed metrics (row counts, column stats) for each partition
  2. Latest state tracking - Know the current state of each partition and when it was last modified
  3. Fine-grained monitoring - Monitor data quality and distribution at partition granularity
  4. Optimized queries - Identify partitions to scan based on min/max values and data freshness
  5. Data profiling - Analyze data characteristics (nulls, NaNs, size) per partition for optimization
  6. Incremental processing - Efficiently identify which partitions contain relevant data for downstream pipelines

Output

This PR ensures the TableStatsCollectionSparkApp executes all 4 collection tasks (table stats, commit events, partition events, and partition stats) synchronously while maintaining complete data collection and publishing functionality.

End-to-End Verification (Docker)

1. Sequential Execution Timeline

25/12/11 09:08:26 INFO spark.TableStatsCollectionSparkApp: Starting table stats collection for table: testdb.partition_stats_test 25/12/11 09:08:36 INFO spark.TableStatsCollectionSparkApp: Completed table stats collection for table: testdb.partition_stats_test in 9694 ms

25/12/11 09:08:36 INFO spark.TableStatsCollectionSparkApp: Starting commit events collection for table: testdb.partition_stats_test 25/12/11 09:08:38 INFO spark.TableStatsCollectionSparkApp: Completed commit events collection for table: testdb.partition_stats_test (3 events) in 2258 ms

25/12/11 09:08:38 INFO spark.TableStatsCollectionSparkApp: Starting partition events collection for table: testdb.partition_stats_test 25/12/11 09:08:41 INFO spark.TableStatsCollectionSparkApp: Completed partition events collection for table: testdb.partition_stats_test (3 partition events) in 3282 ms

25/12/11 09:08:41 INFO spark.TableStatsCollectionSparkApp: Starting partition stats collection for table: testdb.partition_stats_test 25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: Completed partition stats collection for table: testdb.partition_stats_test (3 partition stats) in 7895 ms

25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: Total collection time for table: testdb.partition_stats_test in 23137 ms

Key Points:

  • ✅ Tasks execute sequentially (no overlapping timestamps)
  • ✅ Each task starts immediately after previous completes
  • ✅ Total time = sum of individual tasks (9.7s + 2.3s + 3.3s + 7.9s = 23.1s)
  • ✅ No "parallel execution" in log message (synchronous pattern confirmed)

2. publishStats Log Output

25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: Publishing stats for table: testdb.partition_stats_test 25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: {"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","creationTimeMs":1765444016084,"numFiles":3,"sizeInBytes":3045,"numRows":5,"numColumns":4,"numPartitions":3,"earliestPartitionDate":"2024-01-01"}

Key Points:

  • ✅ Table-level stats published successfully
  • ✅ numRows: 5 (total across all partitions)
  • ✅ numPartitions: 3 (2024-01-01, 2024-01-02, 2024-01-03)
  • ✅ earliestPartitionDate: "2024-01-01" (correctly identified)
  • ✅ Table metadata and size metrics populated

3. publishCommitEvents Log Output

25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: Publishing commit events for table: testdb.partition_stats_test 25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: [{"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":5642811578603876150,"commitTimestampMs":1765444061000,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129777},{"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":7929592344081159299,"commitTimestampMs":1765444064000,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129777},{"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":8368973829645132323,"commitTimestampMs":1765444066000,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129777}]

Key Points:

  • ✅ All 3 commit events published successfully
  • ✅ commitAppId: "local-1765443996768" (populated)
  • ✅ commitAppName: "Spark shell" (populated)
  • ✅ commitOperation: "APPEND" (properly parsed)
  • ✅ Commit timestamps in chronological order

4. publishPartitionEvents Log Output

25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: Publishing partition events for table: testdb.partition_stats_test 25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: [{"partitionData":[{"columnName":"event_time_day","value":"2024-01-01"}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":5642811578603876150,"commitTimestampMs":1765444061000,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129777},{"partitionData":[{"columnName":"event_time_day","value":"2024-01-02"}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":7929592344081159299,"commitTimestampMs":1765444064000,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129777},{"partitionData":[{"columnName":"event_time_day","value":"2024-01-03"}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":8368973829645132323,"commitTimestampMs":1765444066000,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129777}]

Key Points:

  • ✅ All 3 partition events published successfully
  • ✅ partitionData: Contains partition column name and values (event_time_day: 2024-01-01, 2024-01-02, 2024-01-03)
  • ✅ commitAppId: "local-1765443996768" (populated)
  • ✅ commitAppName: "Spark shell" (populated)
  • ✅ commitOperation: "APPEND" (properly parsed)
  • ✅ Each event represents a different partition with correct commit metadata

5. publishPartitionStats Log Output

25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: Publishing partition stats for table: testdb.partition_stats_test (3 stats) 25/12/11 09:08:49 INFO spark.TableStatsCollectionSparkApp: [{"partitionData":[{"columnName":"event_time_day","value":"2024-01-01"}],"rowCount":2,"columnCount":4,"nullCount":[{"columnName":"event_time","value":0},{"columnName":"id","value":0},{"columnName":"name","value":0},{"columnName":"region","value":0}],"nanCount":[{"columnName":"event_time","value":0},{"columnName":"id","value":0},{"columnName":"name","value":0},{"columnName":"region","value":0}],"minValue":[{"columnName":"event_time","value":"2024-01-01 10:00:00.0"},{"columnName":"id","value":1},{"columnName":"name","value":"Alice"},{"columnName":"region","value":"EU"}],"maxValue":[{"columnName":"event_time","value":"2024-01-01 11:00:00.0"},{"columnName":"id","value":2},{"columnName":"name","value":"Bob"},{"columnName":"region","value":"US"}],"columnSizeInBytes":[{"columnName":"event_time","value":30},{"columnName":"id","value":12},{"columnName":"name","value":26},{"columnName":"region","value":22}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":5642811578603876150,"commitTimestampMs":1765444061,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129782},{"partitionData":[{"columnName":"event_time_day","value":"2024-01-02"}],"rowCount":2,"columnCount":4,"nullCount":[{"columnName":"event_time","value":0},{"columnName":"id","value":0},{"columnName":"name","value":0},{"columnName":"region","value":0}],"nanCount":[{"columnName":"event_time","value":0},{"columnName":"id","value":0},{"columnName":"name","value":0},{"columnName":"region","value":0}],"minValue":[{"columnName":"event_time","value":"2024-01-02 10:00:00.0"},{"columnName":"id","value":3},{"columnName":"name","value":"Charlie"},{"columnName":"region","value":"APAC"}],"maxValue":[{"columnName":"event_time","value":"2024-01-02 11:00:00.0"},{"columnName":"id","value":4},{"columnName":"name","value":"David"},{"columnName":"region","value":"US"}],"columnSizeInBytes":[{"columnName":"event_time","value":30},{"columnName":"id","value":12},{"columnName":"name","value":30},{"columnName":"region","value":24}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":7929592344081159299,"commitTimestampMs":1765444064,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129782},{"partitionData":[{"columnName":"event_time_day","value":"2024-01-03"}],"rowCount":1,"columnCount":4,"nullCount":[{"columnName":"event_time","value":0},{"columnName":"id","value":0},{"columnName":"name","value":0},{"columnName":"region","value":0}],"nanCount":[{"columnName":"event_time","value":0},{"columnName":"id","value":0},{"columnName":"name","value":0},{"columnName":"region","value":0}],"minValue":[{"columnName":"event_time","value":"2024-01-03 10:00:00.0"},{"columnName":"id","value":5},{"columnName":"name","value":"Eve"},{"columnName":"region","value":"EU"}],"maxValue":[{"columnName":"event_time","value":"2024-01-03 10:00:00.0"},{"columnName":"id","value":5},{"columnName":"name","value":"Eve"},{"columnName":"region","value":"EU"}],"columnSizeInBytes":[{"columnName":"event_time","value":15},{"columnName":"id","value":6},{"columnName":"name","value":12},{"columnName":"region","value":11}],"dataset":{"databaseName":"testdb","tableName":"partition_stats_test","clusterName":"LocalHadoopCluster","tableMetadataLocation":"/data/openhouse/testdb/partition_stats_test-088a1368-1212-49b1-b3d9-b6cabdec290e","partitionSpec":"[\n 1000: event_time_day: day(4)\n]"},"commitMetadata":{"commitId":8368973829645132323,"commitTimestampMs":1765444066,"commitAppId":"local-1765443996768","commitAppName":"Spark shell","commitOperation":"APPEND"},"eventTimestampMs":1765444129782}]

Key Points:

  • ✅ All 3 partition stats published successfully
  • ✅ Complete column-level metrics: nullCount, nanCount, minValue, maxValue, columnSizeInBytes
  • ✅ Partition data correctly captured (event_time_day: 2024-01-01, 2024-01-02, 2024-01-03)
  • ✅ Row counts accurate: 2, 2, 1 for respective partitions
  • ✅ Min/max values correctly computed per partition (Alice/Bob, Charlie/David, Eve)
  • ✅ Commit metadata properly associated with each partition stat
  • ✅ Latest commit info reflects the commit that created/modified each partition

6. Job Completion

2025-12-11 09:08:59 INFO OperationTask:233 - Finished job for entity TableMetadata(super=Metadata(creator=openhouse), dbName=testdb, tableName=partition_stats_test, ...): JobId TABLE_STATS_COLLECTION_testdb_partition_stats_test_83a5ebff-d232-4217-97d9-6a1da8881ddd, executionId 0, runTime 37322, queuedTime 13259, state SUCCEEDED

Key Points:

  • ✅ Job completed successfully: state SUCCEEDED
  • ✅ Total runtime: 37.3 seconds (including scheduler overhead)
  • ✅ Collection time: 23.1 seconds (synchronous execution)
  • ✅ All 4 publishing methods executed without errors

This Output section:

✅ Shows all 4 publish methods (stats, commit events, partition events, partition stats)
✅ Includes actual log output with JSON data
✅ Highlights the sequential execution pattern
✅ Provides key validation points for each publish method
✅ Demonstrates successful end-to-end execution
✅ Uses your actual Docker test logs

Key Features:

1. Synchronous Sequential Execution

  • All 4 collection tasks execute one after another in a predictable order:
    1. Table Stats Collection
    2. Commit Events Collection
    3. Partition Events Collection
    4. Partition Stats Collection
  • Each task waits for the previous to complete before starting
  • No CompletableFuture or parallel processing complexity
  • Example execution: Task 1 (9.7s) → Task 2 (2.3s) → Task 3 (3.3s) → Task 4 (7.9s) = 23.1s total

2. Predictable Execution Order

  • Guaranteed sequential execution eliminates race conditions
  • Resources allocated and released in a controlled manner
  • Easier to debug with clear execution timeline
  • Simplified error handling - failures don't affect parallel tasks

3. Maintained Data Collection Functionality

  • ✅ Table stats collected and published (IcebergTableStats)
  • ✅ Commit events collected and published (CommitEventTable)
  • ✅ Partition events collected and published (CommitEventTablePartitions)
  • ✅ Partition stats collected and published (CommitEventTablePartitionStats)
  • All existing functionality preserved with synchronous execution pattern

4. Robust Error Handling

  • ✅ Null/empty results handled gracefully for each task
  • ✅ Publishing skipped if collection fails or returns no data
  • ✅ Unpartitioned tables handled correctly (empty partition events/stats)
  • ✅ Each task logs start/completion with timing information
  • ✅ Failures in one task don't impact subsequent tasks

5. Performance Trade-off Accepted

  • Sequential execution: ~23 seconds (4 tasks in series)
  • Previous parallel execution: ~14 seconds (estimated)
  • Trade-off justification:
    • Resolves downstream repository execution errors
    • Can be optimized later if needed without changing API

6. Comprehensive Timing Metrics

  • Individual task timing logged: "Completed [task] for table: [name] in [ms] ms"
  • Total collection time logged: "Total collection time for table: [name] in [ms] ms"
  • No misleading "parallel execution" message
  • Clear visibility into where time is spent

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

@srawat98-dev srawat98-dev marked this pull request as ready for review December 11, 2025 12:00
…tsCollectorUtil for clarity and consistency
Copy link
Member

@abhisheknath2011 abhisheknath2011 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @srawat98-dev for addressing my comments. Yeah lets have a follow up refactoring PR for code readability.

… TableStatsCollectorUtil for improved accuracy and consistency
Copy link
Collaborator

@cbb330 cbb330 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are unresolved comments, but

I reviewed the unresolved comments and all have either been explicitly responded or else implicitly resolved in code. due to this, i'm going to preemptively resolve all the comments and add shipit so that this PR is unblocked in @srawat98-dev 's IST hours

@abhisheknath2011 / @teamurko feel free to intercede / require a follow up if my conclusion is incorrect

@abhisheknath2011 abhisheknath2011 merged commit b2d7d0e into linkedin:main Jan 14, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants