Skip to content

Conversation

@nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Jan 8, 2026

all fields prefixed with @ will be renamed to have _ prefix
this is to make field queryable

Summary by CodeRabbit

  • New Features
    • Events with field names beginning with '@' are now automatically normalized to use '_' prefix for improved schema alignment and field matching.

✏️ Tip: You can customize this high-level summary in your review settings.

all fields prefixed with `@` will be renamed to have `_` prefix
this is to make field queryable
@nitisht nitisht requested a review from parmesant January 8, 2026 10:28
@nitisht nitisht changed the title normalise field name: change prefix from @ to _ normalise field name: change prefix from @ to _ to allow proper querying Jan 8, 2026
@nitisht nitisht closed this Jan 8, 2026
@nitisht nitisht reopened this Jan 8, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 8, 2026

Walkthrough

A new public utility function normalize_field_name is introduced to rename field strings by replacing a leading '@' with '_'. This normalization is then applied at multiple points: in JSON key transformation via a new private function rename_json_keys, in field type matching during data override, and in field lookup operations for schema validation.

Changes

Cohort / File(s) Summary
Field Name Normalization Utility
src/event/format/mod.rs
Added public function normalize_field_name(name: &mut String) to transform field names by replacing '@' prefix with '_'. Applied in override_data_type to normalize field names during field matching and type upgrades.
JSON Key Normalization
src/event/format/json.rs
Added private function rename_json_keys to normalize keys in JSON values before processing. Applied normalization in to_data to transform incoming JSON keys and in fields_mismatch to normalize lookup names during field validation.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~15 minutes

Poem

🐰 A hop through fields, a field through hops,
We swap the '@' for '_' without a stop,
Field names dance as schemas align,
Normalization blooms—a design so fine! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description is minimal but lacks required template sections like rationale, key changes, and testing/documentation checkboxes. Expand the description to include the rationale for the change, detailed key changes, and complete the template checklist items.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: renaming fields with @ prefix to use _ prefix for proper querying.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @src/event/format/json.rs:
- Around line 121-122: collect_keys currently returns raw field names (e.g.,
"@foo") which are only normalized later by rename_json_keys, causing
derive_arrow_schema to miss matches and re-infer schemas; fix by normalizing the
collected keys immediately after collect_keys and before calling
derive_arrow_schema (i.e., pass rename_json_keys(collect_keys(...)) into
derive_arrow_schema or call rename_json_keys on the keys array returned by
collect_keys), ensuring functions rename_json_keys, collect_keys, and
derive_arrow_schema are updated accordingly so schema lookup uses the normalized
names.
🧹 Nitpick comments (2)
src/event/format/mod.rs (2)

60-67: Consider optimizing to avoid allocation.

The format! macro allocates a new String. Since the PR aims to "avoid extra allocation by mutating the var," consider using replace_range for in-place mutation:

⚡ More efficient in-place mutation
 #[inline]
 pub fn normalize_field_name(name: &mut String) {
-    if let Some(stripped) = name.strip_prefix('@') {
-        *name = format!("_{}", stripped);
+    if name.starts_with('@') {
+        name.replace_range(0..1, "_");
     }
 }

347-376: Logic is correct but consider adding clarification.

The normalization flow is correct: the field name is normalized for schema creation and pattern matching (line 357), while the original field name is used for value lookup from the JSON map (line 350). This works because JSON key normalization happens later in the pipeline via rename_json_keys.

Consider adding a brief comment at line 347-349 to clarify this subtle but important distinction for future maintainers.

📝 Suggested clarifying comment
         .map(|field| {
+            // Normalize field name for schema - JSON keys are normalized later in the pipeline
             let mut field_name = field.name().to_string();
             normalize_field_name(&mut field_name);
             match (schema_version, map.get(field.name())) {
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 31a3ad9 and 277674d.

📒 Files selected for processing (2)
  • src/event/format/json.rs
  • src/event/format/mod.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.

Applied to files:

  • src/event/format/mod.rs
🧬 Code graph analysis (1)
src/event/format/json.rs (2)
src/event/format/mod.rs (1)
  • normalize_field_name (63-67)
src/utils/arrow/mod.rs (1)
  • get_field (76-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (1)
src/event/format/json.rs (1)

294-296: LGTM!

The normalization here is necessary and correct since the schema being validated against contains normalized field names (with "_" prefix instead of "@").

Comment on lines +121 to +122
// Rename JSON keys starting with '@' to '_' to match the schema
let value_arr = rename_json_keys(value_arr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd src/event/format && wc -l json.rs

Repository: parseablehq/parseable

Length of output: 76


🏁 Script executed:

cd src/event/format && head -150 json.rs | tail -50

Repository: parseablehq/parseable

Length of output: 1820


🏁 Script executed:

cd src/event/format && sed -n '70,130p' json.rs

Repository: parseablehq/parseable

Length of output: 2711


🏁 Script executed:

rg -n "fn collect_keys" --type rust

Repository: parseablehq/parseable

Length of output: 184


🏁 Script executed:

rg -n "fn derive_arrow_schema" --type rust

Repository: parseablehq/parseable

Length of output: 117


🏁 Script executed:

rg -n "fn rename_json_keys" --type rust

Repository: parseablehq/parseable

Length of output: 149


🏁 Script executed:

sed -n '231,243p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 454


🏁 Script executed:

sed -n '244,263p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 691


🏁 Script executed:

sed -n '264,290p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 868


🏁 Script executed:

rg -n "fn normalize_field_name" --type rust -A 5

Repository: parseablehq/parseable

Length of output: 379


🏁 Script executed:

rg -n "stream_schema" src/event/format/json.rs --type rust -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 1112


🏁 Script executed:

rg -n "stream_schema\|schema_schema" src/event/format/json.rs --type rust -B 3 -A 3 | head -60

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "HashMap.*Field\|stream_schema.*insert" src/event/format/json.rs --type rust -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "pub fn into_event\|fn load_schema" --type rust | head -20

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '55,80p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1136


🏁 Script executed:

rg -n "impl.*Handler.*for.*JsonEvent" src/event/format/json.rs --type rust -B 2 -A 15

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "HashMap<String, Arc<Field>>" --type rust | head -20

Repository: parseablehq/parseable

Length of output: 1416


🏁 Script executed:

rg -n "load_schema\|get_schema\|create_stream_and_schema" --type rust -B 2 -A 5 | head -40

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '869,890p' src/streams.rs

Repository: parseablehq/parseable

Length of output: 122


🏁 Script executed:

rg -n "get_schema_raw" src/parseable --type rust -B 3 -A 10 | head -60

Repository: parseablehq/parseable

Length of output: 961


🏁 Script executed:

rg -n "infer_json_schema_from_iterator\|update_field_type_in_schema" src/event/format --type rust -B 2 -A 5 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '82,110p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1443


🏁 Script executed:

rg -n "schema.insert\|fields_to_map\|update.*schema" src/handlers/http/ingest.rs --type rust -B 3 -A 8 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '594,620p' src/handlers/http/ingest.rs

Repository: parseablehq/parseable

Length of output: 861


🏁 Script executed:

rg -n "Metadata\|serialize_schema\|schema.clone\|write_schema" src/metadata.rs --type rust -B 2 -A 5 | head -100

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '75,115p' src/metadata.rs

Repository: parseablehq/parseable

Length of output: 1421


🏁 Script executed:

rg -n "write.*metadata\|save_schema\|update_schema" src/parseable --type rust -B 3 -A 8 | head -120

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "fn create.*stream\|LogStreamMetadata::new" src/parseable/streams.rs --type rust -B 5 -A 15 | head -100

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "into_recordbatch\|into_event" src/event/format/json.rs --type rust -B 3 -A 20 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '140,180p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1427


🏁 Script executed:

rg -n "fn create_stream\|schema.*insert\|metadata.*write" src/handlers/http/ingest.rs --type rust -B 5 -A 15 | head -150

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "create_stream_and_schema" src --type rust -B 5 -A 20 | head -100

Repository: parseablehq/parseable

Length of output: 6112


🏁 Script executed:

rg -n "schema.*persist\|save.*field\|store.*schema" src/parseable --type rust -B 5 -A 10 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "create_schema_from_metastore\|persist_schema\|set_schema" src/parseable --type rust -B 3 -A 8

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "normalize_field_name" src --type rust -B 5 -A 5

Repository: parseablehq/parseable

Length of output: 3561


🏁 Script executed:

sed -n '1,60p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1954


Normalize collected keys before schema lookup to avoid unnecessary schema inference.

For existing streams, the stored schema uses normalized field names (e.g., _foo instead of @foo). However, collect_keys extracts field names as-is from the incoming JSON. When derive_arrow_schema attempts to match these non-normalized keys against the stored schema, the lookup fails and triggers a full schema inference path.

This causes unnecessary overhead on every event containing "@" prefixed fields, since:

  1. collect_keys extracts "@foo"
  2. derive_arrow_schema tries to find "@foo" in schema (which has "_foo")
  3. Lookup fails, schema inference is triggered
  4. Keys are only normalized later at line 121 via rename_json_keys

Normalize the collected keys before passing them to derive_arrow_schema to avoid repeated schema inference for existing streams.

🤖 Prompt for AI Agents
In @src/event/format/json.rs around lines 121 - 122, collect_keys currently
returns raw field names (e.g., "@foo") which are only normalized later by
rename_json_keys, causing derive_arrow_schema to miss matches and re-infer
schemas; fix by normalizing the collected keys immediately after collect_keys
and before calling derive_arrow_schema (i.e., pass
rename_json_keys(collect_keys(...)) into derive_arrow_schema or call
rename_json_keys on the keys array returned by collect_keys), ensuring functions
rename_json_keys, collect_keys, and derive_arrow_schema are updated accordingly
so schema lookup uses the normalized names.

Comment on lines +263 to +282
/// Renames JSON keys to match the schema transformation using normalize_field_name
fn rename_json_keys(values: Vec<Value>) -> Vec<Value> {
values
.into_iter()
.map(|value| {
if let Value::Object(map) = value {
let new_map: serde_json::Map<String, Value> = map
.into_iter()
.map(|(mut key, val)| {
super::normalize_field_name(&mut key);
(key, val)
})
.collect();
Value::Object(new_map)
} else {
value
}
})
.collect()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Implementation is correct, but consider key collision edge case.

The function correctly normalizes JSON object keys. However, if an input object contains both "@foo" and "_foo", they will collide after normalization, potentially causing data loss (the second insertion would overwrite the first).

While this edge case may be rare, consider whether validation or error handling is needed:

🛡️ Potential collision detection
 fn rename_json_keys(values: Vec<Value>) -> Vec<Value> {
     values
         .into_iter()
         .map(|value| {
             if let Value::Object(map) = value {
+                let mut seen_keys = std::collections::HashSet::new();
                 let new_map: serde_json::Map<String, Value> = map
                     .into_iter()
                     .map(|(mut key, val)| {
+                        let original_key = key.clone();
                         super::normalize_field_name(&mut key);
+                        if !seen_keys.insert(key.clone()) {
+                            tracing::warn!("Key collision detected: '{}' normalizes to existing key '{}'", original_key, key);
+                        }
                         (key, val)
                     })
                     .collect();
                 Value::Object(new_map)
             } else {
                 value
             }
         })
         .collect()
 }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants