report hand off count finite appenderator driver (#3925)

This commit is contained in:
Parag Jain 2017-02-13 12:41:24 -06:00 committed by Gian Merlino
parent b7a88706f3
commit 8e31a465ad
4 changed files with 20 additions and 8 deletions

View File

@ -281,7 +281,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
appenderator = appenderator0;
@ -841,7 +841,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private FiniteAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox
final TaskToolbox toolbox,
final FireDepartmentMetrics metrics
)
{
return new FiniteAppenderatorDriver(
@ -851,7 +852,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getHandoffConditionTimeout()
tuningConfig.getHandoffConditionTimeout(),
metrics
);
}

View File

@ -385,7 +385,7 @@ public class IndexTask extends AbstractTask
try (
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema);
final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator, fireDepartmentMetrics);
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser())
) {
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
@ -504,7 +504,8 @@ public class IndexTask extends AbstractTask
private FiniteAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
final SegmentAllocator segmentAllocator
final SegmentAllocator segmentAllocator,
final FireDepartmentMetrics metrics
)
{
return new FiniteAppenderatorDriver(
@ -514,7 +515,8 @@ public class IndexTask extends AbstractTask
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
Integer.MAX_VALUE, // rows for a partition is already determined by the shardSpec
0
0,
metrics
);
}

View File

@ -41,6 +41,7 @@ import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
@ -77,6 +78,7 @@ public class FiniteAppenderatorDriver implements Closeable
private final ObjectMapper objectMapper;
private final int maxRowsPerSegment;
private final long handoffConditionTimeout;
private final FireDepartmentMetrics metrics;
// All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments".
@ -100,6 +102,7 @@ public class FiniteAppenderatorDriver implements Closeable
* @param maxRowsPerSegment maximum number of rows allowed in an entire segment (not a single persist)
* @param handoffConditionTimeout maximum number of millis allowed for handoff (not counting push/publish), zero
* means wait forever.
* @param metrics Firedepartment metrics
*/
public FiniteAppenderatorDriver(
Appenderator appenderator,
@ -108,7 +111,8 @@ public class FiniteAppenderatorDriver implements Closeable
UsedSegmentChecker usedSegmentChecker,
ObjectMapper objectMapper,
int maxRowsPerSegment,
long handoffConditionTimeout
long handoffConditionTimeout,
FireDepartmentMetrics metrics
)
{
this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
@ -119,6 +123,7 @@ public class FiniteAppenderatorDriver implements Closeable
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.maxRowsPerSegment = maxRowsPerSegment;
this.handoffConditionTimeout = handoffConditionTimeout;
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
}
/**
@ -469,6 +474,7 @@ public class FiniteAppenderatorDriver implements Closeable
{
final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment);
log.info("Segment[%s] successfully handed off, dropping.", identifier);
metrics.incrementHandOffCount();
final ListenableFuture<?> dropFuture = appenderator.drop(identifier);
Futures.addCallback(
dropFuture,

View File

@ -36,6 +36,7 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Granularity;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
@ -102,7 +103,8 @@ public class FiniteAppenderatorDriverTest
new TestUsedSegmentChecker(),
OBJECT_MAPPER,
MAX_ROWS_PER_SEGMENT,
HANDOFF_CONDITION_TIMEOUT
HANDOFF_CONDITION_TIMEOUT,
new FireDepartmentMetrics()
);
}