NIFI-8289 Refine QuestDB status repository rollover and add time zone support

This closes #4883

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Bence Simon 2021-03-10 22:58:44 -05:00 committed by Joey Frazee
parent b039606cf8
commit 57cca88eea
2 changed files with 181 additions and 82 deletions

View File

@ -25,15 +25,14 @@ import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
/**
* QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
@ -42,22 +41,28 @@ import java.util.Set;
*/
public class EmbeddedQuestDbRolloverHandler implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
// Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
// Distinct keyword is not recognized if the date mapping is not within an inner query
static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
private final QuestDbContext dbContext;
private final Supplier<ZonedDateTime> timeSource;
private final List<String> tables = new ArrayList<>();
private final int daysToKeepData;
private final QuestDbContext dbContext;
EmbeddedQuestDbRolloverHandler(final Supplier<ZonedDateTime> timeSource, final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
this.timeSource = timeSource;
this.tables.addAll(tables);
this.daysToKeepData = daysToKeepData;
this.dbContext = dbContext;
}
public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) {
this.tables.addAll(tables);
this.dbContext = dbContext;
this.daysToKeepData = daysToKeepData;
this(() -> ZonedDateTime.now(), tables, daysToKeepData, dbContext);
}
@Override
@ -69,11 +74,13 @@ public class EmbeddedQuestDbRolloverHandler implements Runnable {
private void rolloverTable(final CharSequence tableName) {
try {
final Set<String> partitions = getPartitions(tableName);
final Set<String> partitionToKeep = getPartitionsToKeep();
final List<String> partitions = getPartitions(tableName);
final String oldestPartitionToKeep = getOldestPartitionToKeep();
for (final String partition : partitions) {
if (!partitionToKeep.contains(partition)) {
// The last partition if exists, it is considered as "active partition" and cannot be deleted.
for (int i = 0; i < partitions.size() - 1; i++) {
final String partition = partitions.get(i);
if (oldestPartitionToKeep.compareTo(partition) > 0) {
deletePartition(tableName, partition);
}
}
@ -90,9 +97,9 @@ public class EmbeddedQuestDbRolloverHandler implements Runnable {
}
}
private Set<String> getPartitions(final CharSequence tableName) throws Exception {
private List<String> getPartitions(final CharSequence tableName) throws Exception {
final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
final Set<String> result = new HashSet<>();
final List<String> result = new ArrayList<>(daysToKeepData + 1);
try (
final SqlCompiler compiler = dbContext.getCompiler();
@ -105,19 +112,13 @@ public class EmbeddedQuestDbRolloverHandler implements Runnable {
}
}
Collections.sort(result);
return result;
}
private Set<String> getPartitionsToKeep() {
final Instant now = Instant.now();
// Note: as only full partitions might be deleted and the status history repository works with day based partitions,
// a partition must remain until any part of it might be the subject of request.
final Set<String> result = new HashSet<>();
for (int i = 0; i < daysToKeepData + 1; i++) {
result.add(DATE_FORMATTER.format(now.minus(i, ChronoUnit.DAYS)));
}
return result;
private String getOldestPartitionToKeep() {
final ZonedDateTime now = timeSource.get();
final ZonedDateTime utc = now.minusDays(daysToKeepData).withZoneSameInstant(ZoneOffset.UTC);
return utc.format(DATE_FORMATTER);
}
}

View File

@ -35,26 +35,47 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class EmbeddedQuestDbRolloverHandlerTest {
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandlerTest.class);
private static final String PATH_BASE = "target/questdb";
private String CREATE_TABLE = "CREATE TABLE measurements (capturedAt TIMESTAMP, value INT) TIMESTAMP(capturedAt) PARTITION BY DAY";
private static final String CREATE_TABLE = "CREATE TABLE measurements (capturedAt TIMESTAMP, value INT) TIMESTAMP(capturedAt) PARTITION BY DAY";
final Instant now = Instant.now();
private static final String UTC_MAR_1_1200 = "03/01/2021 12:00:00 UTC";
private static final String UTC_MAR_2_1200 = "03/02/2021 12:00:00 UTC";
private static final String UTC_MAR_5_1200 = "03/05/2021 12:00:00 UTC";
private static final String UTC_MAR_6_1200 = "03/06/2021 12:00:00 UTC";
private static final String UTC_MAR_7_1200 = "03/07/2021 12:00:00 UTC";
private static final String UTC_MAR_8_1200 = "03/08/2021 12:00:00 UTC";
private static final String UTC_MAR_8_1700 = "03/08/2021 17:00:00 UTC";
private static final String EST_MAR_5_1200 = "03/05/2021 12:00:00 EST"; // UTC: 03/05/2021 17:00:00
private static final String EST_MAR_6_1200 = "03/06/2021 12:00:00 EST"; // UTC: 03/06/2021 17:00:00
private static final String EST_MAR_7_1200 = "03/07/2021 12:00:00 EST"; // UTC: 03/07/2021 17:00:00
private static final String EST_MAR_8_1200 = "03/08/2021 12:00:00 EST"; // UTC: 03/08/2021 17:00:00
private static final String EST_MAR_8_1600 = "03/08/2021 16:00:00 EST"; // UTC: 03/08/2021 21:00:00
private static final String EST_MAR_8_1700 = "03/08/2021 17:00:00 EST"; // UTC: 03/09/2021 22:00:00
private static final String EST_MAR_8_2200 = "03/08/2021 22:00:00 EST"; // UTC: 03/09/2021 03:00:00
private static final String EST_MAR_8_2300 = "03/08/2021 23:00:00 EST"; // UTC: 03/09/2021 04:00:00
private static final String SGT_MAR_4_1200 = "03/04/2021 12:00:00 SGT"; // UTC: 03/04/2021 04:00:00
private static final String SGT_MAR_5_1200 = "03/05/2021 12:00:00 SGT"; // UTC: 03/05/2021 04:00:00
private static final String SGT_MAR_6_1200 = "03/06/2021 12:00:00 SGT"; // UTC: 03/06/2021 04:00:00
private static final String SGT_MAR_7_1200 = "03/07/2021 12:00:00 SGT"; // UTC: 03/07/2021 04:00:00
private static final String SGT_MAR_8_1200 = "03/08/2021 12:00:00 SGT"; // UTC: 03/08/2021 04:00:00
private static final String SGT_MAR_8_1300 = "03/08/2021 13:00:00 SGT"; // UTC: 03/08/2021 05:00:00
private static final String SGT_MAR_8_2300 = "03/08/2021 23:00:00 SGT"; // UTC: 03/08/2021 15:00:00
private String path;
private QuestDbContext dbContext;
private EmbeddedQuestDbRolloverHandler testSubject;
@Before
public void setUp() throws Exception {
@ -62,7 +83,6 @@ public class EmbeddedQuestDbRolloverHandlerTest {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(new File(path));
dbContext = givenDbContext();
testSubject = new EmbeddedQuestDbRolloverHandler(Collections.singletonList("measurements"), 2, dbContext);
}
@After
@ -75,68 +95,146 @@ public class EmbeddedQuestDbRolloverHandlerTest {
}
@Test
public void testRollOverWhenWithEmptyDatabase() throws Exception {
public void testNoOffsetTimeZoneWhenPartitionNeedsToBeRolled() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(UTC_MAR_5_1200, UTC_MAR_6_1200, UTC_MAR_7_1200, UTC_MAR_8_1200);
// when
whenRollOverIsExecuted();
whenRollOverIsExecuted(UTC_MAR_8_1700);
// then
thenRemainingPartitionsAre(Arrays.asList());
thenTheRemainingPartitionsAre("2021-03-06", "2021-03-07", "2021-03-08");
}
@Test
public void testRollOverWhenLessPartitionThanNeeded() throws Exception {
// This scenario might occur when the NiFi was stopped and the persistent storage remains in place
public void testNoOffsetTimeZoneAndNonConsecutive() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(givenMeasurementTimes(Arrays.asList(0, 1)));
givenTableIsPopulated(UTC_MAR_1_1200, UTC_MAR_2_1200, UTC_MAR_7_1200, UTC_MAR_8_1200);
// when
whenRollOverIsExecuted();
whenRollOverIsExecuted(UTC_MAR_8_1700);
// then
thenRemainingPartitionsAre(Arrays.asList(0, 1));
thenTheRemainingPartitionsAre("2021-03-07", "2021-03-08");
}
@Test
public void testRollOverWhenNoPartitionToDrop() throws Exception {
public void testNoOffsetTimeWhenNoPartitionsNeedToBeDropped() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(givenMeasurementTimes(Arrays.asList(0, 1, 2)));
givenTableIsPopulated(UTC_MAR_6_1200, UTC_MAR_7_1200, UTC_MAR_8_1200);
// when
whenRollOverIsExecuted();
whenRollOverIsExecuted(UTC_MAR_8_1700);
// then
thenRemainingPartitionsAre(Arrays.asList(0, 1, 2));
thenTheRemainingPartitionsAre("2021-03-06", "2021-03-07", "2021-03-08");
}
@Test
public void testRollOverWhenOldPartitionsPresent() throws Exception {
public void testNoOffsetTimeZoneAndLessPartitionThanNeeded() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(givenMeasurementTimes(Arrays.asList(0, 1, 2, 3, 4)));
givenTableIsPopulated(UTC_MAR_7_1200, UTC_MAR_8_1200);
// when
whenRollOverIsExecuted();
whenRollOverIsExecuted(UTC_MAR_8_1700);
// then
thenRemainingPartitionsAre(Arrays.asList(0, 1, 2));
thenTheRemainingPartitionsAre("2021-03-07", "2021-03-08");
}
@Test
// This scenario might occurs when the NiFi was stopped and the persistens storage remaing
public void testRollOverWhenNonconsecutivePartitionsPresent() throws Exception {
public void testNoOffsetTimeZoneAndOldPartitionsOnly() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(givenMeasurementTimes(Arrays.asList(0, 1, 7, 8, 9)));
givenTableIsPopulated(UTC_MAR_1_1200, UTC_MAR_2_1200);
// when
whenRollOverIsExecuted();
whenRollOverIsExecuted(UTC_MAR_8_1700);
// then - QuestDB will not remove the active partition if presents
thenTheRemainingPartitionsAre("2021-03-02");
}
@Test
public void testNoOffsetTimeZoneAndEmptyDatabase() throws Exception {
// given
givenTableIsCreated(dbContext);
// when
whenRollOverIsExecuted(UTC_MAR_8_1700);
// then
thenRemainingPartitionsAre(Arrays.asList(0, 1));
thenNoPartitionsExpected();
}
@Test
public void testNegativeOffsetTimeZoneWhenOverlaps() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(EST_MAR_5_1200, EST_MAR_6_1200, EST_MAR_7_1200, EST_MAR_8_1200, EST_MAR_8_1600);
// when
whenRollOverIsExecuted(EST_MAR_8_1700);
// then
thenTheRemainingPartitionsAre("2021-03-06", "2021-03-07", "2021-03-08");
}
@Test
public void testNegativeOffsetTimeZoneWhenOverlapsAndRolledLater() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(EST_MAR_5_1200, EST_MAR_6_1200, EST_MAR_7_1200, EST_MAR_8_1200, EST_MAR_8_1600);
// when
whenRollOverIsExecuted(EST_MAR_8_2300);
// then (there is no data inserted into the time range after the partition 2021-03-08, so 2021-03-09 is not created)
thenTheRemainingPartitionsAre("2021-03-07", "2021-03-08");
}
@Test
public void testNegativeOffsetTimeZoneWhenHangsOver() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(EST_MAR_6_1200, EST_MAR_7_1200, EST_MAR_8_1200, EST_MAR_8_2200);
// when
whenRollOverIsExecuted(EST_MAR_8_2300);
// then
thenTheRemainingPartitionsAre("2021-03-07", "2021-03-08", "2021-03-09");
}
@Test
public void testPositiveOffsetTimeZoneWhenOverlaps() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(SGT_MAR_4_1200, SGT_MAR_5_1200, SGT_MAR_6_1200, SGT_MAR_7_1200, SGT_MAR_8_1200);
// when
whenRollOverIsExecuted(SGT_MAR_8_1300);
// then
thenTheRemainingPartitionsAre("2021-03-06", "2021-03-07", "2021-03-08");
}
@Test
public void testPositiveOffsetTimeZoneWhenHangsOver() throws Exception {
// given
givenTableIsCreated(dbContext);
givenTableIsPopulated(SGT_MAR_4_1200, SGT_MAR_5_1200, SGT_MAR_6_1200, SGT_MAR_7_1200, SGT_MAR_8_1200);
// when
whenRollOverIsExecuted(SGT_MAR_8_2300);
// then
thenTheRemainingPartitionsAre("2021-03-06", "2021-03-07", "2021-03-08");
}
private QuestDbContext givenDbContext() {
@ -149,43 +247,39 @@ public class EmbeddedQuestDbRolloverHandlerTest {
dbContext.getCompiler().compile(CREATE_TABLE, dbContext.getSqlExecutionContext());
}
private void givenTableIsPopulated(final List<Long> givenMeasurementTimes) {
private void givenTableIsPopulated(final String... dates) throws Exception {
int value = 0;
for (final String date : dates) {
givenTableIsPopulated(date, value++);
}
}
private void givenTableIsPopulated(final String date, final int value) throws Exception {
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss z");
final ZonedDateTime parsedDate = ZonedDateTime.parse(date, formatter);
final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
final TableWriter tableWriter = dbContext.getEngine().getWriter(executionContext.getCairoSecurityContext(), "measurements");
for (int i = 0; i < givenMeasurementTimes.size(); i++) {
final TableWriter.Row row = tableWriter.newRow(TimeUnit.MILLISECONDS.toMicros(givenMeasurementTimes.get(i)));
row.putTimestamp(0, TimeUnit.MILLISECONDS.toMicros(givenMeasurementTimes.get(i)));
row.putInt(1, i);
row.append();
}
final TableWriter.Row row = tableWriter.newRow(TimeUnit.MILLISECONDS.toMicros(parsedDate.toInstant().toEpochMilli()));
row.putInt(1, value);
row.append();
tableWriter.commit();
tableWriter.close();
}
private List<Long> givenMeasurementTimes(final List<Integer> daysBack) {
final List<Long> result = new LinkedList<>();
private void whenRollOverIsExecuted(final String executedAt) throws Exception {
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss z");
final ZonedDateTime executionTime = ZonedDateTime.parse(executedAt, formatter);
for (final Integer day : daysBack) {
result.add(now.minus(day, ChronoUnit.DAYS).toEpochMilli());
}
result.sort((l1, l2) -> l1.compareTo(l2));
return result;
}
private void whenRollOverIsExecuted() {
final Supplier<ZonedDateTime> timeSource = () -> executionTime;
final EmbeddedQuestDbRolloverHandler testSubject = new EmbeddedQuestDbRolloverHandler(timeSource, Collections.singletonList("measurements"), 2, dbContext);
testSubject.run();
}
private void thenRemainingPartitionsAre(final List<Integer> expectedDays) throws Exception {
final List<String> expectedPartitions = new ArrayList<>(expectedDays.size());
for (final Integer expectedDay : expectedDays) {
expectedPartitions.add(EmbeddedQuestDbRolloverHandler.DATE_FORMATTER.format(now.minus(expectedDay, ChronoUnit.DAYS)));
}
private void thenTheRemainingPartitionsAre(final String... expectedPartitions) throws Exception {
final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext();
final RecordCursorFactory cursorFactory = dbContext.getCompiler()
.compile(String.format(EmbeddedQuestDbRolloverHandler.SELECTION_QUERY, "measurements"), executionContext).getRecordCursorFactory();
@ -198,10 +292,14 @@ public class EmbeddedQuestDbRolloverHandlerTest {
existingPartitions.add(new StringBuilder(record.getStr(0)).toString());
}
Assert.assertEquals(expectedPartitions.size(), existingPartitions.size());
Assert.assertEquals(expectedPartitions.length, existingPartitions.size());
for (final String expectedPartition : expectedPartitions) {
Assert.assertTrue("Partition " + expectedPartition + " is expected", existingPartitions.contains(expectedPartition));
}
}
}
private void thenNoPartitionsExpected() throws Exception {
thenTheRemainingPartitionsAre();
}
}