mirror of https://github.com/apache/nifi.git
NIFI-10451 Updated QuestDB Status History Rollover to work with version 6
This closes #6374 Co-authored-by: David Handermann <exceptionfactory@apache.org> Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
aca93ed3ce
commit
a4a3bdbc98
|
@ -19,8 +19,11 @@ package org.apache.nifi.controller.status.history;
|
|||
import io.questdb.cairo.sql.Record;
|
||||
import io.questdb.cairo.sql.RecordCursor;
|
||||
import io.questdb.cairo.sql.RecordCursorFactory;
|
||||
import io.questdb.griffin.CompiledQuery;
|
||||
import io.questdb.griffin.SqlCompiler;
|
||||
import io.questdb.griffin.SqlExecutionContext;
|
||||
import io.questdb.mp.SCSequence;
|
||||
import io.questdb.mp.TimeoutBlockingWaitStrategy;
|
||||
import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -32,6 +35,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
|
@ -44,7 +48,7 @@ public class EmbeddedQuestDbRolloverHandler implements Runnable {
|
|||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
|
||||
|
||||
// Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way
|
||||
private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'";
|
||||
private static final String DELETION_QUERY = "ALTER TABLE %s DROP PARTITION LIST '%s'";
|
||||
// Distinct keyword is not recognized if the date mapping is not within an inner query
|
||||
static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
|
||||
|
||||
|
@ -91,7 +95,8 @@ public class EmbeddedQuestDbRolloverHandler implements Runnable {
|
|||
|
||||
private void deletePartition(final CharSequence tableName, final String partition) {
|
||||
try (final SqlCompiler compiler = dbContext.getCompiler()) {
|
||||
compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
|
||||
final CompiledQuery compile = compiler.compile(String.format(DELETION_QUERY, tableName, partition), dbContext.getSqlExecutionContext());
|
||||
compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, TimeUnit.SECONDS)));
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue