NIFI-6180: exposing firehose grace period to DruidTranquilityController

this allows for configuring `druidBeam.firehoseGracePeriod`
(https://github.com/druid-io/tranquility/blob/master/docs/configuration.md#properties)

NIFI-6180: Corrected typo in DruidTranquilityController
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3403
This commit is contained in:
Endre Zoltan Kovacs 2019-04-03 16:35:04 +02:00 committed by Matthew Burgess
parent e907b689e8
commit 3a63de2ae2
2 changed files with 17 additions and 3 deletions

View File

@ -80,6 +80,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
private final static String FIREHOSE_PATTERN = "druid:firehose:%s";
private final static AllowableValue PT1M = new AllowableValue("PT1M", "1 minute", "1 minute");
private final static AllowableValue PT5M = new AllowableValue("PT5M", "5 minutes", "5 minutes");
private final static AllowableValue PT10M = new AllowableValue("PT10M", "10 minutes", "10 minutes");
private final static AllowableValue PT60M = new AllowableValue("PT60M", "60 minutes", "1 hour");
@ -276,6 +277,16 @@ public class DruidTranquilityController extends AbstractControllerService implem
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FIREHOSE_GRACE_PERIOD = new PropertyDescriptor.Builder()
.name("druid-cs-firehose-grace-period")
.displayName("Firehose Grace Period")
.description("An additional grace period, after the \"Late Event Grace Period\" (window period) has elapsed, but before the indexing task is shut down.")
.required(true)
.allowableValues(PT1M, PT5M, PT10M, PT60M)
.defaultValue(PT5M.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("druid-cs-batch-size")
.displayName("Batch Size")
@ -332,6 +343,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
props.add(QUERY_GRANULARITY);
props.add(INDEX_RETRY_PERIOD);
props.add(WINDOW_PERIOD);
props.add(FIREHOSE_GRACE_PERIOD);
props.add(TIMESTAMP_FIELD);
props.add(MAX_BATCH_SIZE);
props.add(MAX_PENDING_BATCHES);
@ -378,6 +390,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue();
final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue();
final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue();
final String firehoseGracePeriod = context.getProperty(FIREHOSE_GRACE_PERIOD).getValue();
final String indexRetryPeriod = context.getProperty(INDEX_RETRY_PERIOD).getValue();
final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).evaluateAttributeExpressions().getValue();
final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).evaluateAttributeExpressions().getValue();
@ -416,7 +429,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null);
final Beam<Map<String, Object>> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication,
segmentGranularity, queryGranularity, windowPeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam);
@ -433,7 +446,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
}
Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions,
String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions,
List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
return DruidBeams.builder(timestamper)
.curator(curator)
@ -454,6 +467,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
DruidBeamConfig
.builder()
.indexRetryPeriod(new Period(indexRetryPeriod))
.firehoseGracePeriod(new Period(firehoseGracePeriod))
.build())
.buildBeam();
}

View File

@ -137,7 +137,7 @@ public class MockDruidTranquilityController extends DruidTranquilityController {
@SuppressWarnings("unchecked")
@Override
Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions,
String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions,
List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
return mock(Beam.class);
}