Skip to content

Silent Integer Overflow in AVRO Schema Evolution #417

@KasparMetsa

Description

@KasparMetsa

Steps to Reproduce

cd /Users/km/dev/ksml/docs/local-docker-compose-setup-with-sr

Phase 1: Produce Messages with Schema v2 (field as long with large values)

Step 1.1: Configure Schema v2 (sequence field as long)

Edit examples/SensorData.avsc and add a sequence field as long after the timestamp field:

{
  "namespace": "io.axual.ksml.example",
  "doc": "Emulated sensor data with a few additional attributes",
  "name": "SensorData",
  "type": "record",
  "fields": [
    {
      "doc": "The name of the sensor",
      "name": "name",
      "type": "string"
    },
    {
      "doc": "The timestamp of the sensor reading",
      "name": "timestamp",
      "type": "long"
    },
    {
      "doc": "The sequence number of this reading",
      "name": "sequence",
      "type": "long"
    },
    {
      "doc": "The value of the sensor, represented as string",
      "name": "value",
      "type": "string"
    },
    {
      "doc": "The type of the sensor",
      "name": "type",
      "type": {
        "name": "SensorType",
        "type": "enum",
        "doc": "The type of a sensor",
        "symbols": ["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]
      }
    },
    {
      "doc": "The unit of measurement",
      "name": "unit",
      "type": "string"
    },
    {
      "doc": "The color of the sensor",
      "name": "color",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The city of the sensor",
      "name": "city",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The owner of the sensor",
      "name": "owner",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

Schema v2 has 9 fields with sequence as long (64-bit integer).

Step 1.2: Update Producer to Include sequence Field (as long with large values)

Edit examples/phase1-producer-only.yaml and modify the generator function:

functions:
  generate_sensordata_message:
    type: generator
    globalCode: |
      import time
      import random
      messageCount = 0
    code: |
      global messageCount
      if messageCount >= 10:
        log.info("Producer stopped (count: 10 reached)")
        return None

      key = "sensor" + str(messageCount)
      messageCount += 1

      sensorTypes = ["TEMPERATURE", "HUMIDITY", "LENGTH", "AREA", "STATE"]
      units = ["C", "F", "m", "cm", "Pa"]
      colors = ["red", "blue", "green", None]
      cities = ["Amsterdam", "Rotterdam", "Utrecht", None]
      owners = ["Alice", "Bob", "Charlie", None]

      # Generate large long values that exceed int MAX (2,147,483,647)
      # Using values like 3,000,000,000 + messageCount
      largeSequence = 3000000000 + messageCount

      result = (key, {
        "name": key,
        "timestamp": int(time.time() * 1000),
        "sequence": largeSequence,  # long value > int MAX
        "value": str(round(random.uniform(10.0, 30.0), 2)),
        "type": random.choice(sensorTypes),
        "unit": random.choice(units),
        "color": random.choice(colors),
        "city": random.choice(cities),
        "owner": random.choice(owners)
      })

      log.info("========================================")
      log.info("PHASE 1: Producing message {}/10 with schema v2", messageCount)
      log.info("Key: {}", key)
      log.info("Fields (v2): sequence as LONG")
      log.info("Sequence value: {} (long, EXCEEDS int MAX 2,147,483,647)", largeSequence)
      log.info("========================================")

      time.sleep(2)
    expression: result
    resultType: (string, avro:SensorData)

producers:
  sensordata_producer:
    generator: generate_sensordata_message
    count: 10
    interval: 2s
    to:
      topic: sensor_data_avro
      keyType: string
      valueType: avro:SensorData

CRITICAL: The sequence values (3,000,000,001 to 3,000,000,010) exceed int's maximum value of 2,147,483,647!

Step 1.3: Configure for Phase 1

Edit examples/ksml-runner.yaml:

ksml:
  definitions:
    # PHASE 1: Uncomment this line
    phase1: phase1-producer-only.yaml

    # PHASE 2: Keep this commented
    #phase2: phase2-processor-only.yaml

    # DEFAULT: Comment these out
    #producer: producer.yaml
    #processor: processor.yaml

Step 1.4: Start Docker Compose

docker compose up -d && docker compose logs ksml -f

Step 1.5: Watch Producer Generate 10 Messages

You'll see output like:

========================================
PHASE 1: Producing message 1/10 with schema v2
Key: sensor0
Fields (v2): sequence as LONG
Sequence value: 3000000001 (long, EXCEEDS int MAX 2,147,483,647)
========================================

... (messages 2-9) ...

========================================
PHASE 1: Producing message 10/10 with schema v2
Key: sensor9
Fields (v2): sequence as LONG
Sequence value: 3000000010 (long, EXCEEDS int MAX 2,147,483,647)
========================================

Producer stopped (count: 10 reached)

Time: 20 seconds (2 seconds per message)

Step 1.6: Stop KSML

docker compose stop ksml

IMPORTANT: Stop KSML but keep Kafka and Schema Registry running!

Phase 1 Complete: 10 messages with schema v2 (sequence as long with values > int MAX) are in Kafka


Interlude: Demote Schema (Change long to int)

Step 2.1: Demote sequence Field from long to int

Edit examples/SensorData.avsc and change the sequence field type from long to int:

{
  "namespace": "io.axual.ksml.example",
  "doc": "Emulated sensor data with a few additional attributes",
  "name": "SensorData",
  "type": "record",
  "fields": [
    {
      "doc": "The name of the sensor",
      "name": "name",
      "type": "string"
    },
    {
      "doc": "The timestamp of the sensor reading",
      "name": "timestamp",
      "type": "long"
    },
    {
      "doc": "The sequence number of this reading",
      "name": "sequence",
      "type": "int"
    },
    {
      "doc": "The value of the sensor, represented as string",
      "name": "value",
      "type": "string"
    },
    {
      "doc": "The type of the sensor",
      "name": "type",
      "type": {
        "name": "SensorType",
        "type": "enum",
        "doc": "The type of a sensor",
        "symbols": ["AREA", "HUMIDITY", "LENGTH", "STATE", "TEMPERATURE"]
      }
    },
    {
      "doc": "The unit of measurement",
      "name": "unit",
      "type": "string"
    },
    {
      "doc": "The color of the sensor",
      "name": "color",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The city of the sensor",
      "name": "city",
      "type": ["null", "string"],
      "default": null
    },
    {
      "doc": "The owner of the sensor",
      "name": "owner",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

CHANGED: "type": "long""type": "int" for the sequence field

Step 2.2: Configure for Phase 2

Edit examples/ksml-runner.yaml:

ksml:
  definitions:
    # PHASE 1: Comment this out
    #phase1: phase1-producer-only.yaml

    # PHASE 2: Uncomment this line
    phase2: phase2-processor-only.yaml

    # DEFAULT: Keep these commented
    #producer: producer.yaml
    #processor: processor.yaml

Schema Change: v2 (sequence as long) → v1 (sequence as int)


Phase 2: Read v2 Data with v1 Schema → FAIL!

Step 3.1: Start KSML

docker compose start ksml && docker compose logs ksml -f

Step 3.2: Expected Behavior - Forward Incompatibility Error

Expected Outcome: AVRO Deserialization Error

Actual Outcome: No error

Messages like these get produced:

{
    "sequence": -1294967286,
    "unit": "cm",
    "owner": {
        "string": "Bob"
    },
    "name": "SENSOR9",
    "type": "TEMPERATURE",
    "timestamp": 1763602592011,
    "city": {
        "string": "Rotterdam"
    },
    "color": {
        "string": "blue"
    },
    "value": "16.51"
}

Proposed fix:

Proposed Fix

Add overflow detection in AvroDataObjectMapper.java:

case INT -> {
    if (value instanceof DataInteger v) return v.value();
    if (value instanceof DataByte v)
        return v.value() != null ? v.value().intValue() : null;
    if (value instanceof DataShort v)
        return v.value() != null ? v.value().intValue() : null;

    // ADD THIS:
    if (value instanceof DataLong v) {
        if (v.value() == null) return null;
        long longVal = v.value();

        if (longVal < Integer.MIN_VALUE || longVal > Integer.MAX_VALUE) {
            throw new DataException(
                "Value " + longVal + " exceeds INT range [" +
                Integer.MIN_VALUE + ", " + Integer.MAX_VALUE + "]. " +
                "Use 'long' type in schema or ensure values fit in INT range."
            );
        }

        return (int) longVal;
    }

    return fromDataObject(value);
}

Apply Similar Fix For:

  • FLOAT: Add DataDouble validation (precision loss detection)
  • SHORT: Add DataInteger/DataLong validation (overflow detection)
  • BYTE: Add DataShort/DataInteger/DataLong validation (overflow detection)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions