NIFI-13529 Set Calcite Connection timeZone to UTC for Records

Calcite adjusts Timestamp objects returned from a Result Set based on the configured Time Zone Offset in an attempt to localize the results. Framework use of Calcite for Record processing expects input Timestamp values to remain unchanged, so setting the timeZone property to UTC with an offset of 0 effectively avoids this Calcite localization.

This closes #9066

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-07-09 11:01:41 -04:00 committed by exceptionfactory
parent 5193436a53
commit f338215851
No known key found for this signature in database
2 changed files with 44 additions and 0 deletions

View File

@ -193,6 +193,10 @@ public class CalciteDatabase implements Closeable {
calciteProperties = properties;
}
// If not explicitly set, default timezone to UTC. We ensure that when we provide timestamps, we convert them to UTC. We don't want
// Calcite trying to convert them again.
calciteProperties.putIfAbsent("timeZone", "UTC");
final Connection sqlConnection = DriverManager.getConnection("jdbc:calcite:", calciteProperties);
final CalciteConnection connection = sqlConnection.unwrap(CalciteConnection.class);
connection.getRootSchema().setCacheEnabled(false);

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -85,6 +87,25 @@ public class TestCalciteDatabase {
}
}
@Test
public void testWithTimestamp() throws SQLException, IOException {
final String query = "SELECT * FROM CANNED_DATA";
try (final CalciteDatabase database = createNameTimestampDatabase();
final PreparedStatement stmt = database.getConnection().prepareStatement(query);
final ResultSet resultSet = stmt.executeQuery()) {
assertTrue(resultSet.next());
// We should get the same result whether we call getTimestamp() or getObject(). We should also get back the same original Long value.
final Timestamp timestamp = resultSet.getTimestamp(2);
assertEquals(timestamp, resultSet.getObject(2));
assertEquals(1704056400000L, timestamp.getTime());
assertFalse(resultSet.next());
}
}
public static class ToUpperCase {
public String invoke(final String value) {
return value.toUpperCase();
@ -113,6 +134,25 @@ public class TestCalciteDatabase {
return database;
}
private CalciteDatabase createNameTimestampDatabase() throws SQLException {
final CalciteDatabase database = new CalciteDatabase();
final NiFiTableSchema tableSchema = new NiFiTableSchema(List.of(
new ColumnSchema("name", String.class, false),
new ColumnSchema("dob", Timestamp.class, false)
));
final List<Object[]> rows = new ArrayList<>();
rows.add(new Object[] {"Mark", new Timestamp(1704056400000L)});
final ListDataSource arrayListDataSource = new ListDataSource(tableSchema, rows);
final NiFiTable table = new NiFiTable("CANNED_DATA", arrayListDataSource, mock(ComponentLog.class));
database.addTable(table);
return database;
}
private static class ListDataSource implements ResettableDataSource {
private final NiFiTableSchema schema;