Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected DBConnectorPath getDBConnectorPath(String path) {

@Override
protected SchemaReader getSchemaReader(String sessionID) {
return new OracleSourceSchemaReader(sessionID);
return new OracleSourceSchemaReader(sessionID, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected String createConnectionString() {

@Override
protected SchemaReader getSchemaReader() {
return new OracleSourceSchemaReader();
return new OracleSourceSchemaReader(null, oracleSourceConfig.shouldTreatAsOldTimestamp());
}

@Override
Expand Down Expand Up @@ -101,6 +101,7 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
public static final String NAME_CONNECTION = "connection";
public static final String DEFAULT_ROW_PREFETCH_VALUE = "40";
public static final String DEFAULT_BATCH_SIZE = "10";
public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";

@Name(NAME_USE_CONNECTION)
@Nullable
Expand All @@ -123,11 +124,19 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
@Nullable
private Integer defaultRowPrefetch;

@Name(TREAT_AS_OLD_TIMESTAMP)
@Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain"
+ "backward compatibility.")
@Macro
@Nullable
private Boolean treatAsOldTimestamp = false;


public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName,
String connectionArguments, String connectionType, String database, String role,
int defaultBatchValue, int defaultRowPrefetch,
String importQuery, Integer numSplits, int fetchSize,
String boundingQuery, String splitBy, Boolean useSSL) {
String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp) {
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
connectionType, database, role, useSSL);
this.defaultBatchValue = defaultBatchValue;
Expand All @@ -137,6 +146,7 @@ public OracleSourceConfig(String host, int port, String user, String password, S
this.numSplits = numSplits;
this.boundingQuery = boundingQuery;
this.splitBy = splitBy;
this.treatAsOldTimestamp = treatAsOldTimestamp;
}

@Override
Expand All @@ -163,6 +173,10 @@ public OracleConnectorConfig getConnection() {
return connection;
}

public boolean shouldTreatAsOldTimestamp() {
return Boolean.TRUE.equals(treatAsOldTimestamp);
}

@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {

private final String sessionID;

private final boolean treatAsOldTimestamp;

public OracleSourceSchemaReader() {
this(null);
this(null, false);
}

public OracleSourceSchemaReader(String sessionID) {
public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have to call it in oracle source class to make it work

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

super();
this.sessionID = sessionID;
this.treatAsOldTimestamp = treatAsOldTimestamp;
}

@Override
Expand All @@ -83,8 +86,17 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
case TIMESTAMP_TZ:
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case Types.TIMESTAMP:
Copy link

@vikasrathee-cs vikasrathee-cs May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For timestamp it should be return Schema.of(Schema.LogicalType.DATETIME); always. handle both cases separately

case TIMESTAMP_LTZ:
return Schema.of(Schema.LogicalType.DATETIME);
case TIMESTAMP_LTZ:
// TIMESTAMP_LTZ (Local timezone timestamp)
// - Legacy behavior used TIMESTAMP_MICROS
// - New behavior uses DATETIME for accurate semantic representation
// Use treatAsOldTimestamp flag to ensure backward compatibility
if (treatAsOldTimestamp) {
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment mentioning why are we doing it

} else {
return Schema.of(Schema.LogicalType.DATETIME);
}
case BINARY_FLOAT:
return Schema.of(Schema.Type.FLOAT);
case BINARY_DOUBLE:
Expand Down
9 changes: 9 additions & 0 deletions oracle-plugin/widgets/Oracle-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@
"default": "40",
"min": "1"
}
},
{
"widget-type": "hidden",
"label": "Treat As Old Timestamp",
"name": "treatAsOldTimestamp",
"description": "For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.",
"widget-attributes": {
"default": "false"
}
}
]
}
Expand Down