-
Notifications
You must be signed in to change notification settings - Fork 14
Description
Steps to Reproduce
cd /Users/km/dev/ksml/docs/local-docker-compose-setup-with-srPhase 1: Produce Messages with Schema v2 (field as long with large values)
Step 1.1: Configure Schema v2 (sequence field as long)
Edit examples/SensorData.avsc and add a sequence field as long after the timestamp field:
{
"namespace": "io.axual.ksml.example",
"doc": "Emulated sensor data with a few additional attributes",
"name": "SensorData",
"type": "record",
"fields": [
{
"doc": "The name of the sensor",
"name": "name",
"type": "string"
},
{
"doc": "The timestamp of the sensor reading",
"name": "timestamp",
"type": "long"
},
{
"doc": "The sequence number of this reading",
"name": "sequence",
"type": "long"
},
{
"doc": "The value of the sensor, represented as string",
"name": "value",
"type": "string"
},
{
"doc": "The type of the sensor",
"name": "type",
"type": {
"name": "SensorType",
"type": "enum",
"doc": "The type of a sensor",
"symbols": ["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]
}
},
{
"doc": "The unit of measurement",
"name": "unit",
"type": "string"
},
{
"doc": "The color of the sensor",
"name": "color",
"type": ["null", "string"],
"default": null
},
{
"doc": "The city of the sensor",
"name": "city",
"type": ["null", "string"],
"default": null
},
{
"doc": "The owner of the sensor",
"name": "owner",
"type": ["null", "string"],
"default": null
}
]
}Schema v2 has 9 fields with sequence as long (64-bit integer).
Step 1.2: Update Producer to Include sequence Field (as long with large values)
Edit examples/phase1-producer-only.yaml and modify the generator function:
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
messageCount = 0
code: |
global messageCount
if messageCount >= 10:
log.info("Producer stopped (count: 10 reached)")
return None
key = "sensor" + str(messageCount)
messageCount += 1
sensorTypes = ["TEMPERATURE", "HUMIDITY", "LENGTH", "AREA", "STATE"]
units = ["C", "F", "m", "cm", "Pa"]
colors = ["red", "blue", "green", None]
cities = ["Amsterdam", "Rotterdam", "Utrecht", None]
owners = ["Alice", "Bob", "Charlie", None]
# Generate large long values that exceed int MAX (2,147,483,647)
# Using values like 3,000,000,000 + messageCount
largeSequence = 3000000000 + messageCount
result = (key, {
"name": key,
"timestamp": int(time.time() * 1000),
"sequence": largeSequence, # long value > int MAX
"value": str(round(random.uniform(10.0, 30.0), 2)),
"type": random.choice(sensorTypes),
"unit": random.choice(units),
"color": random.choice(colors),
"city": random.choice(cities),
"owner": random.choice(owners)
})
log.info("========================================")
log.info("PHASE 1: Producing message {}/10 with schema v2", messageCount)
log.info("Key: {}", key)
log.info("Fields (v2): sequence as LONG")
log.info("Sequence value: {} (long, EXCEEDS int MAX 2,147,483,647)", largeSequence)
log.info("========================================")
time.sleep(2)
expression: result
resultType: (string, avro:SensorData)
producers:
sensordata_producer:
generator: generate_sensordata_message
count: 10
interval: 2s
to:
topic: sensor_data_avro
keyType: string
valueType: avro:SensorDataCRITICAL: The sequence values (3,000,000,001 to 3,000,000,010) exceed int's maximum value of 2,147,483,647!
Step 1.3: Configure for Phase 1
Edit examples/ksml-runner.yaml:
ksml:
definitions:
# PHASE 1: Uncomment this line
phase1: phase1-producer-only.yaml
# PHASE 2: Keep this commented
#phase2: phase2-processor-only.yaml
# DEFAULT: Comment these out
#producer: producer.yaml
#processor: processor.yamlStep 1.4: Start Docker Compose
docker compose up -d && docker compose logs ksml -fStep 1.5: Watch Producer Generate 10 Messages
You'll see output like:
========================================
PHASE 1: Producing message 1/10 with schema v2
Key: sensor0
Fields (v2): sequence as LONG
Sequence value: 3000000001 (long, EXCEEDS int MAX 2,147,483,647)
========================================
... (messages 2-9) ...
========================================
PHASE 1: Producing message 10/10 with schema v2
Key: sensor9
Fields (v2): sequence as LONG
Sequence value: 3000000010 (long, EXCEEDS int MAX 2,147,483,647)
========================================
Producer stopped (count: 10 reached)
Time: 20 seconds (2 seconds per message)
Step 1.6: Stop KSML
docker compose stop ksmlIMPORTANT: Stop KSML but keep Kafka and Schema Registry running!
✅ Phase 1 Complete: 10 messages with schema v2 (sequence as long with values > int MAX) are in Kafka
Interlude: Demote Schema (Change long to int)
Step 2.1: Demote sequence Field from long to int
Edit examples/SensorData.avsc and change the sequence field type from long to int:
{
"namespace": "io.axual.ksml.example",
"doc": "Emulated sensor data with a few additional attributes",
"name": "SensorData",
"type": "record",
"fields": [
{
"doc": "The name of the sensor",
"name": "name",
"type": "string"
},
{
"doc": "The timestamp of the sensor reading",
"name": "timestamp",
"type": "long"
},
{
"doc": "The sequence number of this reading",
"name": "sequence",
"type": "int"
},
{
"doc": "The value of the sensor, represented as string",
"name": "value",
"type": "string"
},
{
"doc": "The type of the sensor",
"name": "type",
"type": {
"name": "SensorType",
"type": "enum",
"doc": "The type of a sensor",
"symbols": ["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]
}
},
{
"doc": "The unit of measurement",
"name": "unit",
"type": "string"
},
{
"doc": "The color of the sensor",
"name": "color",
"type": ["null", "string"],
"default": null
},
{
"doc": "The city of the sensor",
"name": "city",
"type": ["null", "string"],
"default": null
},
{
"doc": "The owner of the sensor",
"name": "owner",
"type": ["null", "string"],
"default": null
}
]
}CHANGED: "type": "long" → "type": "int" for the sequence field
Step 2.2: Configure for Phase 2
Edit examples/ksml-runner.yaml:
ksml:
definitions:
# PHASE 1: Comment this out
#phase1: phase1-producer-only.yaml
# PHASE 2: Uncomment this line
phase2: phase2-processor-only.yaml
# DEFAULT: Keep these commented
#producer: producer.yaml
#processor: processor.yamlSchema Change: v2 (sequence as long) → v1 (sequence as int)
Phase 2: Read v2 Data with v1 Schema → FAIL!
Step 3.1: Start KSML
docker compose start ksml && docker compose logs ksml -fStep 3.2: Expected Behavior - Forward Incompatibility Error
Expected Outcome: AVRO Deserialization Error
Actual Outcome: No error
Messages like these get produced:
{
"sequence": -1294967286,
"unit": "cm",
"owner": {
"string": "Bob"
},
"name": "SENSOR9",
"type": "TEMPERATURE",
"timestamp": 1763602592011,
"city": {
"string": "Rotterdam"
},
"color": {
"string": "blue"
},
"value": "16.51"
}
Proposed fix:
Proposed Fix
Add overflow detection in AvroDataObjectMapper.java:
case INT -> {
if (value instanceof DataInteger v) return v.value();
if (value instanceof DataByte v)
return v.value() != null ? v.value().intValue() : null;
if (value instanceof DataShort v)
return v.value() != null ? v.value().intValue() : null;
// ADD THIS:
if (value instanceof DataLong v) {
if (v.value() == null) return null;
long longVal = v.value();
if (longVal < Integer.MIN_VALUE || longVal > Integer.MAX_VALUE) {
throw new DataException(
"Value " + longVal + " exceeds INT range [" +
Integer.MIN_VALUE + ", " + Integer.MAX_VALUE + "]. " +
"Use 'long' type in schema or ensure values fit in INT range."
);
}
return (int) longVal;
}
return fromDataObject(value);
}Apply Similar Fix For:
- FLOAT: Add DataDouble validation (precision loss detection)
- SHORT: Add DataInteger/DataLong validation (overflow detection)
- BYTE: Add DataShort/DataInteger/DataLong validation (overflow detection)