diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index cf6450428a4..a2084ce1a67 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -106,17 +107,23 @@ public class YeOldePlumberSchool implements PlumberSchool } @Override - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { return -1; } - return sink.add(row); + final int numRows = sink.add(row); + + if (!sink.canAppendRow()) { + persist(committerSupplier.get()); + } + + return numRows; } - public Sink getSink(long timestamp) + private Sink getSink(long timestamp) { if (theSink.getInterval().contains(timestamp)) { return theSink; @@ -132,10 +139,10 @@ public class YeOldePlumberSchool implements PlumberSchool } @Override - public void persist(Runnable commitRunnable) + public void persist(Committer committer) { spillIfSwappable(); - commitRunnable.run(); + committer.run(); } @Override @@ -229,12 +236,6 @@ public class YeOldePlumberSchool implements PlumberSchool { return new File(persistDir, String.format("spill%d", n)); } - - @Override - public void persist(Committer commitRunnable) - { - persist(commitRunnable); - } }; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index e353f93ada8..93c16d246f1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import com.google.common.hash.Hashing; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; +import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -53,6 +55,7 @@ import io.druid.segment.indexing.TuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -117,18 +120,18 @@ public class IndexTask extends AbstractFixedIntervalTask ); } - static RealtimeTuningConfig convertTuningConfig(ShardSpec spec, IndexTuningConfig config) + static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlushBoundary, IndexSpec indexSpec) { return new RealtimeTuningConfig( - config.getRowFlushBoundary(), + rowFlushBoundary, null, null, null, null, null, null, - spec, - config.getIndexSpec(), + shardSpec, + indexSpec, null, null, null, @@ -340,9 +343,15 @@ public class IndexTask extends AbstractFixedIntervalTask } }; + // rowFlushBoundary for this job + final int myRowFlushBoundary = rowFlushBoundary > 0 + ? rowFlushBoundary + : toolbox.getConfig().getDefaultRowFlushBoundary(); + // Create firehose + plumber final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser()); + final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); final Plumber plumber = new YeOldePlumberSchool( interval, version, @@ -350,14 +359,10 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - convertTuningConfig(shardSpec, ingestionSchema.getTuningConfig()), + convertTuningConfig(shardSpec, myRowFlushBoundary, ingestionSchema.getTuningConfig().getIndexSpec()), metrics ); - // rowFlushBoundary for this job - final int myRowFlushBoundary = rowFlushBoundary > 0 - ? rowFlushBoundary - : toolbox.getConfig().getDefaultRowFlushBoundary(); final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(); try { plumber.startJob(); @@ -366,7 +371,7 @@ public class IndexTask extends AbstractFixedIntervalTask final InputRow inputRow = firehose.nextRow(); if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) { - int numRows = plumber.add(inputRow); + int numRows = plumber.add(inputRow, committerSupplier); if (numRows == -1) { throw new ISE( String.format( @@ -376,10 +381,6 @@ public class IndexTask extends AbstractFixedIntervalTask ); } metrics.incrementProcessed(); - - if (numRows >= myRowFlushBoundary) { - plumber.persist(firehose.commit()); - } } else { metrics.incrementThrownAway(); } @@ -389,7 +390,7 @@ public class IndexTask extends AbstractFixedIntervalTask firehose.close(); } - plumber.persist(firehose.commit()); + plumber.persist(committerSupplier.get()); try { plumber.finishJob(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index a56da1a700d..50789991832 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -20,12 +20,14 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.indexing.common.TaskLock; @@ -46,15 +48,15 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; -import io.druid.segment.realtime.plumber.Sink; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.joda.time.Period; import java.io.File; import java.io.IOException; @@ -78,8 +80,8 @@ public class RealtimeIndexTask extends AbstractTask static String makeTaskId(String dataSource, int partitionNumber, DateTime timestamp, int randomBits) { final StringBuilder suffix = new StringBuilder(8); - for(int i = 0; i < Ints.BYTES * 2; ++i) { - suffix.append((char)('a' + ((randomBits >>> (i * 4)) & 0x0F))); + for (int i = 0; i < Ints.BYTES * 2; ++i) { + suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F))); } return String.format( "index_realtime_%s_%d_%s_%s", @@ -248,8 +250,8 @@ public class RealtimeIndexTask extends AbstractTask DataSchema dataSchema = spec.getDataSchema(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeTuningConfig tuningConfig = spec.getTuningConfig() - .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) - .withVersioningPolicy(versioningPolicy); + .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) + .withVersioningPolicy(versioningPolicy); final FireDepartment fireDepartment = new FireDepartment( dataSchema, @@ -263,7 +265,7 @@ public class RealtimeIndexTask extends AbstractTask // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and // NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the // NOTE: "same" segment. - final RealtimePlumberSchool plumberSchool = new RealtimePlumberSchool( + final PlumberSchool plumberSchool = new RealtimePlumberSchool( toolbox.getEmitter(), toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getSegmentPusher(), @@ -277,6 +279,7 @@ public class RealtimeIndexTask extends AbstractTask // Delay firehose connection to avoid claiming input resources while the plumber is starting up. Firehose firehose = null; + Supplier committerSupplier = null; try { plumber.startJob(); @@ -285,11 +288,10 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getMonitorScheduler().addMonitor(metricsMonitor); // Set up firehose - final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod(); firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser()); + committerSupplier = Committers.supplierFromFirehose(firehose); // Time to read data! - long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); while (firehose.hasMore()) { final InputRow inputRow; @@ -308,25 +310,14 @@ public class RealtimeIndexTask extends AbstractTask continue; } - int currCount = plumber.add(inputRow); - if (currCount == -1) { + int numRows = plumber.add(inputRow, committerSupplier); + if (numRows == -1) { fireDepartment.getMetrics().incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); - - if (System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - continue; } fireDepartment.getMetrics().incrementProcessed(); - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } } } catch (Throwable e) { @@ -338,7 +329,7 @@ public class RealtimeIndexTask extends AbstractTask finally { if (normalExit) { try { - plumber.persist(firehose.commit()); + plumber.persist(committerSupplier.get()); plumber.finishJob(); } catch (Exception e) { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index e106edd3383..5827c49f1a5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -18,7 +18,6 @@ package io.druid.indexing.common.task; import com.google.common.collect.Lists; -import com.google.common.io.Files; import com.metamx.common.Granularity; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; @@ -308,7 +307,11 @@ public class IndexTaskTest null, new IndexSpec() ); - RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(spec, config); + RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig( + spec, + config.getRowFlushBoundary(), + config.getIndexSpec() + ); Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary()); Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec); Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec); diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 79ff660947e..818aac2f519 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -48,11 +49,9 @@ import io.druid.query.UnionQueryRunner; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.Sink; -import org.joda.time.DateTime; import org.joda.time.Interval; -import org.joda.time.Period; import java.io.Closeable; import java.io.IOException; @@ -304,7 +303,6 @@ public class RealtimeManager implements QuerySegmentWalker private void runFirehoseV2(FirehoseV2 firehose) { - final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); try { firehose.start(); } @@ -312,8 +310,9 @@ public class RealtimeManager implements QuerySegmentWalker log.error(e, "Failed to start firehoseV2"); return; } - long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - log.info("FirehoseV2 started with nextFlush [%s]", nextFlush); + + log.info("FirehoseV2 started"); + final Supplier committerSupplier = Committers.supplierFromFirehoseV2(firehose); boolean haveRow = true; while (haveRow) { InputRow inputRow = null; @@ -321,14 +320,7 @@ public class RealtimeManager implements QuerySegmentWalker try { inputRow = firehose.currRow(); if (inputRow != null) { - try { - numRows = plumber.add(inputRow); - } - catch (IndexSizeExceededException e) { - log.debug(e, "Index limit exceeded: %s", e.getMessage()); - nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); - continue; - } + numRows = plumber.add(inputRow, committerSupplier); if (numRows < 0) { metrics.incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); @@ -351,88 +343,50 @@ public class RealtimeManager implements QuerySegmentWalker catch (Exception e) { log.debug(e, "exception in firehose.advance(), considering unparseable row"); metrics.incrementUnparseable(); - continue; - } - - try { - final Sink sink = inputRow != null ? plumber.getSink(inputRow.getTimestampFromEpoch()) : null; - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); - } - } - catch (Exception e) { - log.makeAlert( - e, - "An exception happened while queue to persist!? We hope it is transient. Ignore and continue." - ); } } } - private long doIncrementalPersist(Committer committer, Period intermediatePersistPeriod) - { - plumber.persist(committer); - return new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - private void runFirehose(Firehose firehose) { - - final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); - - long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - + final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); while (firehose.hasMore()) { - InputRow inputRow = null; + final InputRow inputRow; try { - try { - inputRow = firehose.nextRow(); + inputRow = firehose.nextRow(); - if (inputRow == null) { - log.debug("thrown away null input row, considering unparseable"); - metrics.incrementUnparseable(); - continue; - } - } - catch (Exception e) { - log.debug(e, "thrown away line due to exception, considering unparseable"); + if (inputRow == null) { + log.debug("thrown away null input row, considering unparseable"); metrics.incrementUnparseable(); continue; } - - boolean lateEvent = false; - boolean indexLimitExceeded = false; - try { - lateEvent = plumber.add(inputRow) == -1; - } - catch (IndexSizeExceededException e) { - log.info("Index limit exceeded: %s", e.getMessage()); - indexLimitExceeded = true; - } - if (indexLimitExceeded || lateEvent) { - metrics.incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); - - if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - - continue; - } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - metrics.incrementProcessed(); } catch (ParseException e) { - if (inputRow != null) { - log.error(e, "unparseable line: %s", inputRow); - } + log.debug(e, "thrown away line due to exception, considering unparseable"); metrics.incrementUnparseable(); + continue; } + + boolean lateEvent = false; + boolean indexLimitExceeded = false; + try { + lateEvent = plumber.add(inputRow, committerSupplier) == -1; + } + catch (IndexSizeExceededException e) { + log.info("Index limit exceeded: %s", e.getMessage()); + indexLimitExceeded = true; + } + if (indexLimitExceeded || lateEvent) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); + + if (indexLimitExceeded) { + plumber.persist(committerSupplier.get()); + } + + continue; + } + metrics.incrementProcessed(); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Committers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Committers.java new file mode 100644 index 00000000000..b3089c592fa --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Committers.java @@ -0,0 +1,118 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import com.google.common.base.Supplier; +import io.druid.data.input.Committer; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseV2; + +public class Committers +{ + private static final Committer NIL = new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + // Do nothing + } + }; + + public static Supplier supplierFromRunnable(final Runnable runnable) + { + return supplierOf( + new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + runnable.run(); + } + } + ); + } + + public static Supplier supplierFromFirehose(final Firehose firehose) + { + return new Supplier() + { + @Override + public Committer get() + { + final Runnable commitRunnable = firehose.commit(); + return new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + commitRunnable.run(); + } + }; + } + }; + } + + public static Supplier supplierFromFirehoseV2(final FirehoseV2 firehose) + { + return new Supplier() + { + @Override + public Committer get() + { + return firehose.makeCommitter(); + } + }; + } + + public static Committer nil() + { + return NIL; + } + + public static Supplier supplierOf(final Committer committer) + { + return new Supplier() + { + @Override + public Committer get() + { + return committer; + } + }; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index 01ef058212f..6e7227365ca 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -17,6 +17,7 @@ package io.druid.segment.realtime.plumber; +import com.google.common.base.Supplier; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.Query; @@ -31,30 +32,30 @@ public interface Plumber * * @return the metadata of the "newest" segment that might have previously been persisted */ - public Object startJob(); + Object startJob(); /** - * @param row - the row to insert + * @param row the row to insert + * @param committerSupplier supplier of a committer associated with all data that has been added, including this row + * * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, * -1 means a row was thrown away because it was too late */ - public int add(InputRow row) throws IndexSizeExceededException; - public QueryRunner getQueryRunner(Query query); + int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException; + + QueryRunner getQueryRunner(Query query); /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the * machine's local disk. * - * @param commitRunnable code to run after persisting data + * @param committer committer to use after persisting data */ - void persist(Committer commitRunnable); - void persist(Runnable commitRunnable); + void persist(Committer committer); /** * Perform any final processing and clean up after ourselves. Should be called after all data has been * fed into sinks and persisted. */ - public void finishJob(); - - public Sink getSink(long timestamp); + void finishJob(); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index aef5cef80fa..0adca56b2b0 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -118,6 +119,7 @@ public class RealtimePlumber implements Plumber String.CASE_INSENSITIVE_ORDER ); + private volatile long nextFlush = 0; private volatile boolean shuttingDown = false; private volatile boolean stopped = false; private volatile boolean cleanShutdown = true; @@ -186,22 +188,28 @@ public class RealtimePlumber implements Plumber startPersistThread(); // Push pending sinks bootstrapped from previous run mergeAndPush(); - + resetNextFlush(); return retVal; } @Override - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { final Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { return -1; } - return sink.add(row); + final int numRows = sink.add(row); + + if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) { + persist(committerSupplier.get()); + } + + return numRows; } - public Sink getSink(long timestamp) + private Sink getSink(long timestamp) { if (!rejectionPolicy.accept(timestamp)) { return null; @@ -338,27 +346,6 @@ public class RealtimePlumber implements Plumber ); } - @Override - public void persist(final Runnable commitRunnable) - { - persist( - new Committer() - { - @Override - public Object getMetadata() - { - return null; - } - - @Override - public void run() - { - commitRunnable.run(); - } - } - ); - } - @Override public void persist(final Committer committer) { @@ -442,6 +429,7 @@ public class RealtimePlumber implements Plumber log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay); } runExecStopwatch.stop(); + resetNextFlush(); } // Submits persist-n-merge task for a Sink to the mergeExecutor @@ -606,6 +594,11 @@ public class RealtimePlumber implements Plumber } } + private void resetNextFlush() + { + nextFlush = new DateTime().plus(config.getIntermediatePersistPeriod()).getMillis(); + } + protected void initializeExecutors() { final int maxPendingPersists = config.getMaxPendingPersists(); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index fcb8a039f85..a3bbc0ffb45 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import com.metamx.common.ISE; @@ -193,7 +194,7 @@ public class RealtimeManagerTest Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable()); Assert.assertTrue(plumber.isStartedJob()); Assert.assertTrue(plumber.isFinishedJob()); - Assert.assertEquals(1, plumber.getPersistCount()); + Assert.assertEquals(0, plumber.getPersistCount()); } @Test @@ -214,7 +215,7 @@ public class RealtimeManagerTest Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable()); Assert.assertTrue(plumber2.isStartedJob()); Assert.assertTrue(plumber2.isFinishedJob()); - Assert.assertEquals(1, plumber2.getPersistCount()); + Assert.assertEquals(0, plumber2.getPersistCount()); } private TestInputRowHolder makeRow(final long timestamp) @@ -440,7 +441,7 @@ public class RealtimeManagerTest } @Override - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { if (row == null) { return -1; @@ -470,7 +471,7 @@ public class RealtimeManagerTest } @Override - public void persist(Runnable commitRunnable) + public void persist(Committer committer) { persistCount++; } @@ -480,11 +481,5 @@ public class RealtimeManagerTest { finishedJob = true; } - - @Override - public void persist(Committer commitRunnable) - { - persistCount++; - } } } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 03f8c78437c..0cc211c7a30 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime.plumber; import com.google.common.base.Predicate; +import com.google.common.base.Suppliers; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; @@ -235,37 +236,22 @@ public class RealtimePlumberSchoolTest EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); - plumber.add(row); + final Committer committer = new Committer() + { + @Override + public Object getMetadata() + { + return commitMetadata; + } - if (commitMetadata != null) { - plumber.persist( - new Committer() - { - @Override - public Object getMetadata() - { - return commitMetadata; - } - - @Override - public void run() - { - committed.setValue(true); - } - } - ); - } else { - plumber.persist( - new Runnable() - { - @Override - public void run() - { - committed.setValue(true); - } - } - ); - } + @Override + public void run() + { + committed.setValue(true); + } + }; + plumber.add(row, Suppliers.ofInstance(committer)); + plumber.persist(committer); while (!committed.booleanValue()) { Thread.sleep(100); @@ -293,23 +279,29 @@ public class RealtimePlumberSchoolTest EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); - plumber.add(row); + plumber.add(row, Committers.supplierOf(Committers.nil())); plumber.persist( - new Runnable() - { - @Override - public void run() - { - committed.setValue(true); - throw new RuntimeException(); - } - } + Committers.supplierFromRunnable( + new Runnable() + { + @Override + public void run() + { + committed.setValue(true); + throw new RuntimeException(); + } + } + ).get() ); - while (!committed.booleanValue()) { Thread.sleep(100); } + // Exception may need time to propagate + while (metrics.failedPersists() < 1) { + Thread.sleep(100); + } + Assert.assertEquals(1, metrics.failedPersists()); } }