-
Notifications
You must be signed in to change notification settings - Fork 1
Update Consumer Data Extraction #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
… new nested `TravelerInformation` node
…consumer-data-extraction
… data to match OdeTimJson new format
…or TIM payload conversion
…ta and metadata formats
| } | ||
|
|
||
| public boolean updateTimExpiration(String packetID, String expDate) { | ||
| public boolean updateTimExpiration(String packetID, Instant expDate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: these changes are due to the database being unable to read the expiration date properly. In order for the prepared statement to work, it needed to be converted into a time stamp for the database.
| return region; | ||
| } | ||
|
|
||
| public OdeTimPayload convertTimPayloadJsonToJava(String value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the removal of this class is due to the fact that with the changes to update how the string value from the Kafka topic caused it to match up almost exactly with the other method. The only difference had been that this method had not set the StartDateTime, DurationTime, Priority, or FrameType, however, these are required fields so should be set anyway.
dmccoystephenson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me! Great job!
Verifications made:
- Successful compilation
- Unit tests pass
- Logger Kafka Consumer successfully processes TIM generated using "Incident" TIM body provided in PR description with the following output:
logger-kafka-consumer-1 | 2026-01-23 19:13:13 [main] [,] INFO LoggerKafkaConsumer - Polling found 1 new record(s) to process
logger-kafka-consumer-1 | 2026-01-23 19:13:13 [main] [,] DEBUG LoggerKafkaConsumer - Processing record from partition=0, offset=15
logger-kafka-consumer-1 | 2026-01-23 19:13:13 [main] [,] INFO LoggerKafkaConsumer - Processing message for topic: topic.OdeTimJson
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] DEBUG LoggerKafkaConsumer - Processing TIM generated by TMC
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO TimService - Processing active TIM to add to database
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] DEBUG TimService - Extracting TIM information from region name: 'I_025A_DEC_SAT-9E70E3AD_I_test-I-12559_1'
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO DbInteractions - Creating connection pool with the following configuration:
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO DbInteractions - driverClassName: org.postgresql.Driver
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO DbInteractions - dbUrl: jdbc:postgresql://postgres:5432/postgres?user=username
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO DbInteractions - dbUsername: username
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO DbInteractions - connectionTimeout: 300000
logger-kafka-consumer-1 | 2026-01-23 19:13:14 [main] [,] INFO DbInteractions - maximumPoolSize: 13
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] INFO DbInteractions - Successfully initialized connection pool
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - Extracted information - Route: 025A, Direction: I, ClientId: I, TimType: null
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - Checking if TIM already exists with packet ID: 1B28432C09D015FEC7 and timestamp: 2026-01-23 19:13:00.0
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] INFO TimService - TIM not found in database, adding new TIM with packet ID: 1B28432C09D015FEC7
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - Successfully added TIM with ID: 161245
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - Set satellite record ID from metadata: 9E70E3AD for TIM ID: 161245
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - Using metadata start time: 2026-01-23T19:13:09.956Z for TIM ID: 161245
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - Processing SDX TIM with satellite record ID: 9E70E3AD for TIM ID: 161245
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] WARN TimService - No active TIM holding found for SAT TIM with client ID: 'I', direction: 'I', satellite record ID: '9E70E3AD'
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - TIM ID: 161245 has indefinite duration (32000 minutes)
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG TimService - No desired end time found in active TIM holding for TIM ID: 161245, setting to null
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] INFO TimService - Inserting new active TIM for TIM ID: 161245 (not from WYDOT application - no TimType found)
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG ActiveTimService - Inserting active_tim record with client_id = I and sat_record_id = 9E70E3AD
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] INFO ActiveTimService - Converting 2026-01-23T19:13:09.956Z for TIM_START value
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] INFO TimService - Successfully processed active TIM with ID: 161245 (packet ID: 1B28432C09D015FEC7)
logger-kafka-consumer-1 | 2026-01-23 19:13:15 [main] [,] DEBUG LoggerKafkaConsumer - Successfully added active TIM to database
- New active_tim record shows up as expected:
161245,,,I,2026-01-23 19:13:09.956000,,,025A,I,9E70E3AD,,11404,,,,,,,false
| } | ||
|
|
||
| @Test | ||
| public void TestConvertTimPayloadJsonToJava_Path_MultipleRegions() throws IOException, URISyntaxException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: the multiple region capacity is an important scenario to show support for - is there a reason to remove the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have other tests that tests for multiple regions so I didn't think there was a need for this test as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
referring to these tests:
Line 224 in 8437882
| public void TestConvertTimTopicJsonToJava_HandlesVslContentType_MultipleRegions() throws IOException { |
Line 260 in 8437882
| public void TestConvertTimTopicJsonToJava_HandlesParkingContentType_MultipleRegions() throws IOException { |
Line 297 in 8437882
| public void TestConvertTimTopicJsonToJava_HandlesConstructionContentType_MultipleRegions() throws IOException { |
| } | ||
|
|
||
| @Test | ||
| public void TestConvertTimPayloadJsonToJava_SpeedLimit() throws IOException, URISyntaxException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: speed limit is a very common TIM we need to support, is there a reason to remove the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the speed limit type is very similar to the VSL type, so I figured having tests covering the VSL type would suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| geometry.setDirection("1010101010101010"); | ||
| geometry.setExtent(1);// this is an enum | ||
| geometry.setLaneWidth(BigDecimal.valueOf(33)); | ||
| geometry.setDirection("F0F0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: direction is a HeadingSlice (which we're moving away from using in favor of directionality), what does "F0F0" represent here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just the hex representation of the heading slice? It doesnt' seem to be the same as previous direction, are we just adding new data for all the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add new data for all the tests since the data structure changed so much. The direction "F0F0" was what came out of the ODE.
| if (ath != null) { | ||
| log.debug("Found record in holding table, updating expiration"); | ||
| success = activeTimHoldingService.updateTimExpiration(certExpirationModel.getPacketID(), certExpirationModel.getExpirationDate()); | ||
| success = activeTimHoldingService.updateTimExpiration(certExpirationModel.getPacketID(), Instant.parse(certExpirationModel.getExpirationDate())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: will we ever see a null/empty certExpirationModel.expirationDate and what's the consequence of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I adjusted it so that, in the case of a null expiration date, it will query for the null value like it used to rather then throwing the null pointer exception.
| @@ -58,7 +57,7 @@ public OdeData processTimJson(String value) { | |||
| public OdeData translateTimJson(String value) { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: references to the OdeData now show: OdeData is a raw type. References to generic type OdeData<MetadataT,PayloadT> should be parameterized Java(16777788)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
payneBrandon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple questions, overall looking good!
…prevent null pointer exception
PR Details
Description
With the update in the ODE, the format in which the Logger Kafka Consumer is consuming has changed. This causes all TIMs going through the TIM Manager to be unable to be inserted into the Active TIMs Table when they should be. The solution is to adjust the format in which the kafka Logger Consumer is navigating to get data processed and into the active TIMs table.
How Has This Been Tested?
All unit tests were verified to pass. End-to-End testing was preformed to ensure a local deployment is easily spined up and inserted TIMs made their way into the Active TIM Table. Vsl, rw, rc, cc, parking, and incident TIMs were inserted and verified to make their way into the Active TIM table.
Example TIMs sent through:
{ "timParkingList": [ { "direction": "I", "startPoint": { "latitude": 39.613221750299076, "longitude": -104.89570514902218, "valid": true }, "endPoint": { "latitude": 40.73654117648727, "longitude": -104.99349024587299, "valid": true }, "route": "025A_DEC", "roadCode": "1", "advisory": [ "3" ], "segment": "2", "clientId": "testid", "itisCodes": [ "12559" ], "mileMarker": 1.0, "availability": 1, "exit": "12559" } ] }{ "timIncidentList": [ { "direction": "I", "startPoint": { "latitude": 39.613221750299076, "longitude": -104.89570514902218, "valid": true }, "route": "025A_DEC", "roadCode": "1", "advisory": [ "3" ], "segment": "2", "clientId": "testid", "itisCodes": [ "12559" ], "problem": "mudslide", "effect": "test", "action": "stop", "incidentId": "test", "highway": "025A_DEC", "pk": 1 } ] }Types of changes
Checklist: