Modify Plumbers in these ways,

1) Persist using Committer instead of Runnable. (Although the metadata object
   is ignored in this patch)

2) Remove the getSink method.

3) Plumbers are now responsible for time-based and hydrant-full-based periodic
   committing. (FireChief, RealtimeIndexTask, and IndexTask used to do this)
This commit is contained in:
Gian Merlino 2015-07-25 08:29:02 -07:00
parent 4546652b3b
commit 062a47fba4
10 changed files with 266 additions and 217 deletions

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -106,17 +107,23 @@ public class YeOldePlumberSchool implements PlumberSchool
} }
@Override @Override
public int add(InputRow row) throws IndexSizeExceededException public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{ {
Sink sink = getSink(row.getTimestampFromEpoch()); Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) { if (sink == null) {
return -1; return -1;
} }
return sink.add(row); final int numRows = sink.add(row);
if (!sink.canAppendRow()) {
persist(committerSupplier.get());
}
return numRows;
} }
public Sink getSink(long timestamp) private Sink getSink(long timestamp)
{ {
if (theSink.getInterval().contains(timestamp)) { if (theSink.getInterval().contains(timestamp)) {
return theSink; return theSink;
@ -132,10 +139,10 @@ public class YeOldePlumberSchool implements PlumberSchool
} }
@Override @Override
public void persist(Runnable commitRunnable) public void persist(Committer committer)
{ {
spillIfSwappable(); spillIfSwappable();
commitRunnable.run(); committer.run();
} }
@Override @Override
@ -229,12 +236,6 @@ public class YeOldePlumberSchool implements PlumberSchool
{ {
return new File(persistDir, String.format("spill%d", n)); return new File(persistDir, String.format("spill%d", n));
} }
@Override
public void persist(Committer commitRunnable)
{
persist(commitRunnable);
}
}; };
} }
} }

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -34,6 +35,7 @@ import com.google.common.hash.Hashing;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -53,6 +55,7 @@ import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -117,18 +120,18 @@ public class IndexTask extends AbstractFixedIntervalTask
); );
} }
static RealtimeTuningConfig convertTuningConfig(ShardSpec spec, IndexTuningConfig config) static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlushBoundary, IndexSpec indexSpec)
{ {
return new RealtimeTuningConfig( return new RealtimeTuningConfig(
config.getRowFlushBoundary(), rowFlushBoundary,
null, null,
null, null,
null, null,
null, null,
null, null,
null, null,
spec, shardSpec,
config.getIndexSpec(), indexSpec,
null, null,
null, null,
null, null,
@ -340,9 +343,15 @@ public class IndexTask extends AbstractFixedIntervalTask
} }
}; };
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
// Create firehose + plumber // Create firehose + plumber
final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser()); final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser());
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
final Plumber plumber = new YeOldePlumberSchool( final Plumber plumber = new YeOldePlumberSchool(
interval, interval,
version, version,
@ -350,14 +359,10 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir tmpDir
).findPlumber( ).findPlumber(
schema, schema,
convertTuningConfig(shardSpec, ingestionSchema.getTuningConfig()), convertTuningConfig(shardSpec, myRowFlushBoundary, ingestionSchema.getTuningConfig().getIndexSpec()),
metrics metrics
); );
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(); final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity();
try { try {
plumber.startJob(); plumber.startJob();
@ -366,7 +371,7 @@ public class IndexTask extends AbstractFixedIntervalTask
final InputRow inputRow = firehose.nextRow(); final InputRow inputRow = firehose.nextRow();
if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) { if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) {
int numRows = plumber.add(inputRow); int numRows = plumber.add(inputRow, committerSupplier);
if (numRows == -1) { if (numRows == -1) {
throw new ISE( throw new ISE(
String.format( String.format(
@ -376,10 +381,6 @@ public class IndexTask extends AbstractFixedIntervalTask
); );
} }
metrics.incrementProcessed(); metrics.incrementProcessed();
if (numRows >= myRowFlushBoundary) {
plumber.persist(firehose.commit());
}
} else { } else {
metrics.incrementThrownAway(); metrics.incrementThrownAway();
} }
@ -389,7 +390,7 @@ public class IndexTask extends AbstractFixedIntervalTask
firehose.close(); firehose.close();
} }
plumber.persist(firehose.commit()); plumber.persist(committerSupplier.get());
try { try {
plumber.finishJob(); plumber.finishJob();

View File

@ -20,12 +20,14 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.parsers.ParseException; import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
@ -46,15 +48,15 @@ import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool; import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -78,8 +80,8 @@ public class RealtimeIndexTask extends AbstractTask
static String makeTaskId(String dataSource, int partitionNumber, DateTime timestamp, int randomBits) static String makeTaskId(String dataSource, int partitionNumber, DateTime timestamp, int randomBits)
{ {
final StringBuilder suffix = new StringBuilder(8); final StringBuilder suffix = new StringBuilder(8);
for(int i = 0; i < Ints.BYTES * 2; ++i) { for (int i = 0; i < Ints.BYTES * 2; ++i) {
suffix.append((char)('a' + ((randomBits >>> (i * 4)) & 0x0F))); suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F)));
} }
return String.format( return String.format(
"index_realtime_%s_%d_%s_%s", "index_realtime_%s_%d_%s_%s",
@ -248,8 +250,8 @@ public class RealtimeIndexTask extends AbstractTask
DataSchema dataSchema = spec.getDataSchema(); DataSchema dataSchema = spec.getDataSchema();
RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig();
RealtimeTuningConfig tuningConfig = spec.getTuningConfig() RealtimeTuningConfig tuningConfig = spec.getTuningConfig()
.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist"))
.withVersioningPolicy(versioningPolicy); .withVersioningPolicy(versioningPolicy);
final FireDepartment fireDepartment = new FireDepartment( final FireDepartment fireDepartment = new FireDepartment(
dataSchema, dataSchema,
@ -263,7 +265,7 @@ public class RealtimeIndexTask extends AbstractTask
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and
// NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the // NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the
// NOTE: "same" segment. // NOTE: "same" segment.
final RealtimePlumberSchool plumberSchool = new RealtimePlumberSchool( final PlumberSchool plumberSchool = new RealtimePlumberSchool(
toolbox.getEmitter(), toolbox.getEmitter(),
toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentPusher(), toolbox.getSegmentPusher(),
@ -277,6 +279,7 @@ public class RealtimeIndexTask extends AbstractTask
// Delay firehose connection to avoid claiming input resources while the plumber is starting up. // Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null; Firehose firehose = null;
Supplier<Committer> committerSupplier = null;
try { try {
plumber.startJob(); plumber.startJob();
@ -285,11 +288,10 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getMonitorScheduler().addMonitor(metricsMonitor); toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
// Set up firehose // Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser()); firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
committerSupplier = Committers.supplierFromFirehose(firehose);
// Time to read data! // Time to read data!
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow inputRow; final InputRow inputRow;
@ -308,25 +310,14 @@ public class RealtimeIndexTask extends AbstractTask
continue; continue;
} }
int currCount = plumber.add(inputRow); int numRows = plumber.add(inputRow, committerSupplier);
if (currCount == -1) { if (numRows == -1) {
fireDepartment.getMetrics().incrementThrownAway(); fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow); log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue; continue;
} }
fireDepartment.getMetrics().incrementProcessed(); fireDepartment.getMetrics().incrementProcessed();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
} }
} }
catch (Throwable e) { catch (Throwable e) {
@ -338,7 +329,7 @@ public class RealtimeIndexTask extends AbstractTask
finally { finally {
if (normalExit) { if (normalExit) {
try { try {
plumber.persist(firehose.commit()); plumber.persist(committerSupplier.get());
plumber.finishJob(); plumber.finishJob();
} }
catch (Exception e) { catch (Exception e) {

View File

@ -18,7 +18,6 @@
package io.druid.indexing.common.task; package io.druid.indexing.common.task;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
@ -308,7 +307,11 @@ public class IndexTaskTest
null, null,
new IndexSpec() new IndexSpec()
); );
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(spec, config); RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(
spec,
config.getRowFlushBoundary(),
config.getIndexSpec()
);
Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary()); Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary());
Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec); Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec);
Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec); Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec);

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -48,11 +49,9 @@ import io.druid.query.UnionQueryRunner;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -304,7 +303,6 @@ public class RealtimeManager implements QuerySegmentWalker
private void runFirehoseV2(FirehoseV2 firehose) private void runFirehoseV2(FirehoseV2 firehose)
{ {
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try { try {
firehose.start(); firehose.start();
} }
@ -312,8 +310,9 @@ public class RealtimeManager implements QuerySegmentWalker
log.error(e, "Failed to start firehoseV2"); log.error(e, "Failed to start firehoseV2");
return; return;
} }
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
log.info("FirehoseV2 started with nextFlush [%s]", nextFlush); log.info("FirehoseV2 started");
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehoseV2(firehose);
boolean haveRow = true; boolean haveRow = true;
while (haveRow) { while (haveRow) {
InputRow inputRow = null; InputRow inputRow = null;
@ -321,14 +320,7 @@ public class RealtimeManager implements QuerySegmentWalker
try { try {
inputRow = firehose.currRow(); inputRow = firehose.currRow();
if (inputRow != null) { if (inputRow != null) {
try { numRows = plumber.add(inputRow, committerSupplier);
numRows = plumber.add(inputRow);
}
catch (IndexSizeExceededException e) {
log.debug(e, "Index limit exceeded: %s", e.getMessage());
nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod);
continue;
}
if (numRows < 0) { if (numRows < 0) {
metrics.incrementThrownAway(); metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow); log.debug("Throwing away event[%s]", inputRow);
@ -351,88 +343,50 @@ public class RealtimeManager implements QuerySegmentWalker
catch (Exception e) { catch (Exception e) {
log.debug(e, "exception in firehose.advance(), considering unparseable row"); log.debug(e, "exception in firehose.advance(), considering unparseable row");
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue;
}
try {
final Sink sink = inputRow != null ? plumber.getSink(inputRow.getTimestampFromEpoch()) : null;
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod);
}
}
catch (Exception e) {
log.makeAlert(
e,
"An exception happened while queue to persist!? We hope it is transient. Ignore and continue."
);
} }
} }
} }
private long doIncrementalPersist(Committer committer, Period intermediatePersistPeriod)
{
plumber.persist(committer);
return new DateTime().plus(intermediatePersistPeriod).getMillis();
}
private void runFirehose(Firehose firehose) private void runFirehose(Firehose firehose)
{ {
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) { while (firehose.hasMore()) {
InputRow inputRow = null; final InputRow inputRow;
try { try {
try { inputRow = firehose.nextRow();
inputRow = firehose.nextRow();
if (inputRow == null) { if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable"); log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (Exception e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue; continue;
} }
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow) == -1;
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
metrics.incrementProcessed();
} }
catch (ParseException e) { catch (ParseException e) {
if (inputRow != null) { log.debug(e, "thrown away line due to exception, considering unparseable");
log.error(e, "unparseable line: %s", inputRow);
}
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue;
} }
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow, committerSupplier) == -1;
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (indexLimitExceeded) {
plumber.persist(committerSupplier.get());
}
continue;
}
metrics.incrementProcessed();
} }
} }

View File

@ -0,0 +1,118 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.google.common.base.Supplier;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
public class Committers
{
private static final Committer NIL = new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
// Do nothing
}
};
public static Supplier<Committer> supplierFromRunnable(final Runnable runnable)
{
return supplierOf(
new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
runnable.run();
}
}
);
}
public static Supplier<Committer> supplierFromFirehose(final Firehose firehose)
{
return new Supplier<Committer>()
{
@Override
public Committer get()
{
final Runnable commitRunnable = firehose.commit();
return new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
commitRunnable.run();
}
};
}
};
}
public static Supplier<Committer> supplierFromFirehoseV2(final FirehoseV2 firehose)
{
return new Supplier<Committer>()
{
@Override
public Committer get()
{
return firehose.makeCommitter();
}
};
}
public static Committer nil()
{
return NIL;
}
public static Supplier<Committer> supplierOf(final Committer committer)
{
return new Supplier<Committer>()
{
@Override
public Committer get()
{
return committer;
}
};
}
}

View File

@ -17,6 +17,7 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.google.common.base.Supplier;
import io.druid.data.input.Committer; import io.druid.data.input.Committer;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.query.Query; import io.druid.query.Query;
@ -31,30 +32,30 @@ public interface Plumber
* *
* @return the metadata of the "newest" segment that might have previously been persisted * @return the metadata of the "newest" segment that might have previously been persisted
*/ */
public Object startJob(); Object startJob();
/** /**
* @param row - the row to insert * @param row the row to insert
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
*
* @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp,
* -1 means a row was thrown away because it was too late * -1 means a row was thrown away because it was too late
*/ */
public int add(InputRow row) throws IndexSizeExceededException; int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException;
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
<T> QueryRunner<T> getQueryRunner(Query<T> query);
/** /**
* Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the
* machine's local disk. * machine's local disk.
* *
* @param commitRunnable code to run after persisting data * @param committer committer to use after persisting data
*/ */
void persist(Committer commitRunnable); void persist(Committer committer);
void persist(Runnable commitRunnable);
/** /**
* Perform any final processing and clean up after ourselves. Should be called after all data has been * Perform any final processing and clean up after ourselves. Should be called after all data has been
* fed into sinks and persisted. * fed into sinks and persisted.
*/ */
public void finishJob(); void finishJob();
public Sink getSink(long timestamp);
} }

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -118,6 +119,7 @@ public class RealtimePlumber implements Plumber
String.CASE_INSENSITIVE_ORDER String.CASE_INSENSITIVE_ORDER
); );
private volatile long nextFlush = 0;
private volatile boolean shuttingDown = false; private volatile boolean shuttingDown = false;
private volatile boolean stopped = false; private volatile boolean stopped = false;
private volatile boolean cleanShutdown = true; private volatile boolean cleanShutdown = true;
@ -186,22 +188,28 @@ public class RealtimePlumber implements Plumber
startPersistThread(); startPersistThread();
// Push pending sinks bootstrapped from previous run // Push pending sinks bootstrapped from previous run
mergeAndPush(); mergeAndPush();
resetNextFlush();
return retVal; return retVal;
} }
@Override @Override
public int add(InputRow row) throws IndexSizeExceededException public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{ {
final Sink sink = getSink(row.getTimestampFromEpoch()); final Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) { if (sink == null) {
return -1; return -1;
} }
return sink.add(row); final int numRows = sink.add(row);
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persist(committerSupplier.get());
}
return numRows;
} }
public Sink getSink(long timestamp) private Sink getSink(long timestamp)
{ {
if (!rejectionPolicy.accept(timestamp)) { if (!rejectionPolicy.accept(timestamp)) {
return null; return null;
@ -338,27 +346,6 @@ public class RealtimePlumber implements Plumber
); );
} }
@Override
public void persist(final Runnable commitRunnable)
{
persist(
new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
commitRunnable.run();
}
}
);
}
@Override @Override
public void persist(final Committer committer) public void persist(final Committer committer)
{ {
@ -442,6 +429,7 @@ public class RealtimePlumber implements Plumber
log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay); log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
} }
runExecStopwatch.stop(); runExecStopwatch.stop();
resetNextFlush();
} }
// Submits persist-n-merge task for a Sink to the mergeExecutor // Submits persist-n-merge task for a Sink to the mergeExecutor
@ -606,6 +594,11 @@ public class RealtimePlumber implements Plumber
} }
} }
private void resetNextFlush()
{
nextFlush = new DateTime().plus(config.getIntermediatePersistPeriod()).getMillis();
}
protected void initializeExecutors() protected void initializeExecutors()
{ {
final int maxPendingPersists = config.getMaxPendingPersists(); final int maxPendingPersists = config.getMaxPendingPersists();

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.ISE; import com.metamx.common.ISE;
@ -193,7 +194,7 @@ public class RealtimeManagerTest
Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable()); Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable());
Assert.assertTrue(plumber.isStartedJob()); Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob()); Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount()); Assert.assertEquals(0, plumber.getPersistCount());
} }
@Test @Test
@ -214,7 +215,7 @@ public class RealtimeManagerTest
Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable()); Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable());
Assert.assertTrue(plumber2.isStartedJob()); Assert.assertTrue(plumber2.isStartedJob());
Assert.assertTrue(plumber2.isFinishedJob()); Assert.assertTrue(plumber2.isFinishedJob());
Assert.assertEquals(1, plumber2.getPersistCount()); Assert.assertEquals(0, plumber2.getPersistCount());
} }
private TestInputRowHolder makeRow(final long timestamp) private TestInputRowHolder makeRow(final long timestamp)
@ -440,7 +441,7 @@ public class RealtimeManagerTest
} }
@Override @Override
public int add(InputRow row) throws IndexSizeExceededException public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{ {
if (row == null) { if (row == null) {
return -1; return -1;
@ -470,7 +471,7 @@ public class RealtimeManagerTest
} }
@Override @Override
public void persist(Runnable commitRunnable) public void persist(Committer committer)
{ {
persistCount++; persistCount++;
} }
@ -480,11 +481,5 @@ public class RealtimeManagerTest
{ {
finishedJob = true; finishedJob = true;
} }
@Override
public void persist(Committer commitRunnable)
{
persistCount++;
}
} }
} }

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -235,37 +236,22 @@ public class RealtimePlumberSchoolTest
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>()); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row); EasyMock.replay(row);
plumber.add(row); final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
if (commitMetadata != null) { @Override
plumber.persist( public void run()
new Committer() {
{ committed.setValue(true);
@Override }
public Object getMetadata() };
{ plumber.add(row, Suppliers.ofInstance(committer));
return commitMetadata; plumber.persist(committer);
}
@Override
public void run()
{
committed.setValue(true);
}
}
);
} else {
plumber.persist(
new Runnable()
{
@Override
public void run()
{
committed.setValue(true);
}
}
);
}
while (!committed.booleanValue()) { while (!committed.booleanValue()) {
Thread.sleep(100); Thread.sleep(100);
@ -293,23 +279,29 @@ public class RealtimePlumberSchoolTest
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>()); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row); EasyMock.replay(row);
plumber.add(row); plumber.add(row, Committers.supplierOf(Committers.nil()));
plumber.persist( plumber.persist(
new Runnable() Committers.supplierFromRunnable(
{ new Runnable()
@Override {
public void run() @Override
{ public void run()
committed.setValue(true); {
throw new RuntimeException(); committed.setValue(true);
} throw new RuntimeException();
} }
}
).get()
); );
while (!committed.booleanValue()) { while (!committed.booleanValue()) {
Thread.sleep(100); Thread.sleep(100);
} }
// Exception may need time to propagate
while (metrics.failedPersists() < 1) {
Thread.sleep(100);
}
Assert.assertEquals(1, metrics.failedPersists()); Assert.assertEquals(1, metrics.failedPersists());
} }
} }