From 062a47fba4fd62eb9672c2e639c805313c5d1989 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 25 Jul 2015 08:29:02 -0700 Subject: [PATCH] Modify Plumbers in these ways, 1) Persist using Committer instead of Runnable. (Although the metadata object is ignored in this patch) 2) Remove the getSink method. 3) Plumbers are now responsible for time-based and hydrant-full-based periodic committing. (FireChief, RealtimeIndexTask, and IndexTask used to do this) --- .../common/index/YeOldePlumberSchool.java | 23 ++-- .../druid/indexing/common/task/IndexTask.java | 31 ++--- .../common/task/RealtimeIndexTask.java | 37 +++--- .../indexing/common/task/IndexTaskTest.java | 7 +- .../segment/realtime/RealtimeManager.java | 114 +++++------------ .../segment/realtime/plumber/Committers.java | 118 ++++++++++++++++++ .../segment/realtime/plumber/Plumber.java | 21 ++-- .../realtime/plumber/RealtimePlumber.java | 43 +++---- .../segment/realtime/RealtimeManagerTest.java | 15 +-- .../plumber/RealtimePlumberSchoolTest.java | 74 +++++------ 10 files changed, 266 insertions(+), 217 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/Committers.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index cf6450428a4..a2084ce1a67 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -106,17 +107,23 @@ public class YeOldePlumberSchool implements PlumberSchool } @Override - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { return -1; } - return sink.add(row); + final int numRows = sink.add(row); + + if (!sink.canAppendRow()) { + persist(committerSupplier.get()); + } + + return numRows; } - public Sink getSink(long timestamp) + private Sink getSink(long timestamp) { if (theSink.getInterval().contains(timestamp)) { return theSink; @@ -132,10 +139,10 @@ public class YeOldePlumberSchool implements PlumberSchool } @Override - public void persist(Runnable commitRunnable) + public void persist(Committer committer) { spillIfSwappable(); - commitRunnable.run(); + committer.run(); } @Override @@ -229,12 +236,6 @@ public class YeOldePlumberSchool implements PlumberSchool { return new File(persistDir, String.format("spill%d", n)); } - - @Override - public void persist(Committer commitRunnable) - { - persist(commitRunnable); - } }; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index e353f93ada8..93c16d246f1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import com.google.common.hash.Hashing; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; +import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -53,6 +55,7 @@ import io.druid.segment.indexing.TuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -117,18 +120,18 @@ public class IndexTask extends AbstractFixedIntervalTask ); } - static RealtimeTuningConfig convertTuningConfig(ShardSpec spec, IndexTuningConfig config) + static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlushBoundary, IndexSpec indexSpec) { return new RealtimeTuningConfig( - config.getRowFlushBoundary(), + rowFlushBoundary, null, null, null, null, null, null, - spec, - config.getIndexSpec(), + shardSpec, + indexSpec, null, null, null, @@ -340,9 +343,15 @@ public class IndexTask extends AbstractFixedIntervalTask } }; + // rowFlushBoundary for this job + final int myRowFlushBoundary = rowFlushBoundary > 0 + ? rowFlushBoundary + : toolbox.getConfig().getDefaultRowFlushBoundary(); + // Create firehose + plumber final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser()); + final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); final Plumber plumber = new YeOldePlumberSchool( interval, version, @@ -350,14 +359,10 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - convertTuningConfig(shardSpec, ingestionSchema.getTuningConfig()), + convertTuningConfig(shardSpec, myRowFlushBoundary, ingestionSchema.getTuningConfig().getIndexSpec()), metrics ); - // rowFlushBoundary for this job - final int myRowFlushBoundary = rowFlushBoundary > 0 - ? rowFlushBoundary - : toolbox.getConfig().getDefaultRowFlushBoundary(); final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(); try { plumber.startJob(); @@ -366,7 +371,7 @@ public class IndexTask extends AbstractFixedIntervalTask final InputRow inputRow = firehose.nextRow(); if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) { - int numRows = plumber.add(inputRow); + int numRows = plumber.add(inputRow, committerSupplier); if (numRows == -1) { throw new ISE( String.format( @@ -376,10 +381,6 @@ public class IndexTask extends AbstractFixedIntervalTask ); } metrics.incrementProcessed(); - - if (numRows >= myRowFlushBoundary) { - plumber.persist(firehose.commit()); - } } else { metrics.incrementThrownAway(); } @@ -389,7 +390,7 @@ public class IndexTask extends AbstractFixedIntervalTask firehose.close(); } - plumber.persist(firehose.commit()); + plumber.persist(committerSupplier.get()); try { plumber.finishJob(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index a56da1a700d..50789991832 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -20,12 +20,14 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.indexing.common.TaskLock; @@ -46,15 +48,15 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; -import io.druid.segment.realtime.plumber.Sink; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.joda.time.Period; import java.io.File; import java.io.IOException; @@ -78,8 +80,8 @@ public class RealtimeIndexTask extends AbstractTask static String makeTaskId(String dataSource, int partitionNumber, DateTime timestamp, int randomBits) { final StringBuilder suffix = new StringBuilder(8); - for(int i = 0; i < Ints.BYTES * 2; ++i) { - suffix.append((char)('a' + ((randomBits >>> (i * 4)) & 0x0F))); + for (int i = 0; i < Ints.BYTES * 2; ++i) { + suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F))); } return String.format( "index_realtime_%s_%d_%s_%s", @@ -248,8 +250,8 @@ public class RealtimeIndexTask extends AbstractTask DataSchema dataSchema = spec.getDataSchema(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeTuningConfig tuningConfig = spec.getTuningConfig() - .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) - .withVersioningPolicy(versioningPolicy); + .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) + .withVersioningPolicy(versioningPolicy); final FireDepartment fireDepartment = new FireDepartment( dataSchema, @@ -263,7 +265,7 @@ public class RealtimeIndexTask extends AbstractTask // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and // NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the // NOTE: "same" segment. - final RealtimePlumberSchool plumberSchool = new RealtimePlumberSchool( + final PlumberSchool plumberSchool = new RealtimePlumberSchool( toolbox.getEmitter(), toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getSegmentPusher(), @@ -277,6 +279,7 @@ public class RealtimeIndexTask extends AbstractTask // Delay firehose connection to avoid claiming input resources while the plumber is starting up. Firehose firehose = null; + Supplier committerSupplier = null; try { plumber.startJob(); @@ -285,11 +288,10 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getMonitorScheduler().addMonitor(metricsMonitor); // Set up firehose - final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod(); firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser()); + committerSupplier = Committers.supplierFromFirehose(firehose); // Time to read data! - long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); while (firehose.hasMore()) { final InputRow inputRow; @@ -308,25 +310,14 @@ public class RealtimeIndexTask extends AbstractTask continue; } - int currCount = plumber.add(inputRow); - if (currCount == -1) { + int numRows = plumber.add(inputRow, committerSupplier); + if (numRows == -1) { fireDepartment.getMetrics().incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); - - if (System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - continue; } fireDepartment.getMetrics().incrementProcessed(); - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } } } catch (Throwable e) { @@ -338,7 +329,7 @@ public class RealtimeIndexTask extends AbstractTask finally { if (normalExit) { try { - plumber.persist(firehose.commit()); + plumber.persist(committerSupplier.get()); plumber.finishJob(); } catch (Exception e) { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index e106edd3383..5827c49f1a5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -18,7 +18,6 @@ package io.druid.indexing.common.task; import com.google.common.collect.Lists; -import com.google.common.io.Files; import com.metamx.common.Granularity; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; @@ -308,7 +307,11 @@ public class IndexTaskTest null, new IndexSpec() ); - RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(spec, config); + RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig( + spec, + config.getRowFlushBoundary(), + config.getIndexSpec() + ); Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary()); Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec); Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec); diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 79ff660947e..818aac2f519 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -48,11 +49,9 @@ import io.druid.query.UnionQueryRunner; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.Sink; -import org.joda.time.DateTime; import org.joda.time.Interval; -import org.joda.time.Period; import java.io.Closeable; import java.io.IOException; @@ -304,7 +303,6 @@ public class RealtimeManager implements QuerySegmentWalker private void runFirehoseV2(FirehoseV2 firehose) { - final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); try { firehose.start(); } @@ -312,8 +310,9 @@ public class RealtimeManager implements QuerySegmentWalker log.error(e, "Failed to start firehoseV2"); return; } - long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - log.info("FirehoseV2 started with nextFlush [%s]", nextFlush); + + log.info("FirehoseV2 started"); + final Supplier committerSupplier = Committers.supplierFromFirehoseV2(firehose); boolean haveRow = true; while (haveRow) { InputRow inputRow = null; @@ -321,14 +320,7 @@ public class RealtimeManager implements QuerySegmentWalker try { inputRow = firehose.currRow(); if (inputRow != null) { - try { - numRows = plumber.add(inputRow); - } - catch (IndexSizeExceededException e) { - log.debug(e, "Index limit exceeded: %s", e.getMessage()); - nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); - continue; - } + numRows = plumber.add(inputRow, committerSupplier); if (numRows < 0) { metrics.incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); @@ -351,88 +343,50 @@ public class RealtimeManager implements QuerySegmentWalker catch (Exception e) { log.debug(e, "exception in firehose.advance(), considering unparseable row"); metrics.incrementUnparseable(); - continue; - } - - try { - final Sink sink = inputRow != null ? plumber.getSink(inputRow.getTimestampFromEpoch()) : null; - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); - } - } - catch (Exception e) { - log.makeAlert( - e, - "An exception happened while queue to persist!? We hope it is transient. Ignore and continue." - ); } } } - private long doIncrementalPersist(Committer committer, Period intermediatePersistPeriod) - { - plumber.persist(committer); - return new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - private void runFirehose(Firehose firehose) { - - final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); - - long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - + final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); while (firehose.hasMore()) { - InputRow inputRow = null; + final InputRow inputRow; try { - try { - inputRow = firehose.nextRow(); + inputRow = firehose.nextRow(); - if (inputRow == null) { - log.debug("thrown away null input row, considering unparseable"); - metrics.incrementUnparseable(); - continue; - } - } - catch (Exception e) { - log.debug(e, "thrown away line due to exception, considering unparseable"); + if (inputRow == null) { + log.debug("thrown away null input row, considering unparseable"); metrics.incrementUnparseable(); continue; } - - boolean lateEvent = false; - boolean indexLimitExceeded = false; - try { - lateEvent = plumber.add(inputRow) == -1; - } - catch (IndexSizeExceededException e) { - log.info("Index limit exceeded: %s", e.getMessage()); - indexLimitExceeded = true; - } - if (indexLimitExceeded || lateEvent) { - metrics.incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); - - if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - - continue; - } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - metrics.incrementProcessed(); } catch (ParseException e) { - if (inputRow != null) { - log.error(e, "unparseable line: %s", inputRow); - } + log.debug(e, "thrown away line due to exception, considering unparseable"); metrics.incrementUnparseable(); + continue; } + + boolean lateEvent = false; + boolean indexLimitExceeded = false; + try { + lateEvent = plumber.add(inputRow, committerSupplier) == -1; + } + catch (IndexSizeExceededException e) { + log.info("Index limit exceeded: %s", e.getMessage()); + indexLimitExceeded = true; + } + if (indexLimitExceeded || lateEvent) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); + + if (indexLimitExceeded) { + plumber.persist(committerSupplier.get()); + } + + continue; + } + metrics.incrementProcessed(); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Committers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Committers.java new file mode 100644 index 00000000000..b3089c592fa --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Committers.java @@ -0,0 +1,118 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import com.google.common.base.Supplier; +import io.druid.data.input.Committer; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseV2; + +public class Committers +{ + private static final Committer NIL = new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + // Do nothing + } + }; + + public static Supplier supplierFromRunnable(final Runnable runnable) + { + return supplierOf( + new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + runnable.run(); + } + } + ); + } + + public static Supplier supplierFromFirehose(final Firehose firehose) + { + return new Supplier() + { + @Override + public Committer get() + { + final Runnable commitRunnable = firehose.commit(); + return new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + commitRunnable.run(); + } + }; + } + }; + } + + public static Supplier supplierFromFirehoseV2(final FirehoseV2 firehose) + { + return new Supplier() + { + @Override + public Committer get() + { + return firehose.makeCommitter(); + } + }; + } + + public static Committer nil() + { + return NIL; + } + + public static Supplier supplierOf(final Committer committer) + { + return new Supplier() + { + @Override + public Committer get() + { + return committer; + } + }; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index 01ef058212f..6e7227365ca 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -17,6 +17,7 @@ package io.druid.segment.realtime.plumber; +import com.google.common.base.Supplier; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.Query; @@ -31,30 +32,30 @@ public interface Plumber * * @return the metadata of the "newest" segment that might have previously been persisted */ - public Object startJob(); + Object startJob(); /** - * @param row - the row to insert + * @param row the row to insert + * @param committerSupplier supplier of a committer associated with all data that has been added, including this row + * * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, * -1 means a row was thrown away because it was too late */ - public int add(InputRow row) throws IndexSizeExceededException; - public QueryRunner getQueryRunner(Query query); + int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException; + + QueryRunner getQueryRunner(Query query); /** * Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the * machine's local disk. * - * @param commitRunnable code to run after persisting data + * @param committer committer to use after persisting data */ - void persist(Committer commitRunnable); - void persist(Runnable commitRunnable); + void persist(Committer committer); /** * Perform any final processing and clean up after ourselves. Should be called after all data has been * fed into sinks and persisted. */ - public void finishJob(); - - public Sink getSink(long timestamp); + void finishJob(); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index aef5cef80fa..0adca56b2b0 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -118,6 +119,7 @@ public class RealtimePlumber implements Plumber String.CASE_INSENSITIVE_ORDER ); + private volatile long nextFlush = 0; private volatile boolean shuttingDown = false; private volatile boolean stopped = false; private volatile boolean cleanShutdown = true; @@ -186,22 +188,28 @@ public class RealtimePlumber implements Plumber startPersistThread(); // Push pending sinks bootstrapped from previous run mergeAndPush(); - + resetNextFlush(); return retVal; } @Override - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { final Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { return -1; } - return sink.add(row); + final int numRows = sink.add(row); + + if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) { + persist(committerSupplier.get()); + } + + return numRows; } - public Sink getSink(long timestamp) + private Sink getSink(long timestamp) { if (!rejectionPolicy.accept(timestamp)) { return null; @@ -338,27 +346,6 @@ public class RealtimePlumber implements Plumber ); } - @Override - public void persist(final Runnable commitRunnable) - { - persist( - new Committer() - { - @Override - public Object getMetadata() - { - return null; - } - - @Override - public void run() - { - commitRunnable.run(); - } - } - ); - } - @Override public void persist(final Committer committer) { @@ -442,6 +429,7 @@ public class RealtimePlumber implements Plumber log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay); } runExecStopwatch.stop(); + resetNextFlush(); } // Submits persist-n-merge task for a Sink to the mergeExecutor @@ -606,6 +594,11 @@ public class RealtimePlumber implements Plumber } } + private void resetNextFlush() + { + nextFlush = new DateTime().plus(config.getIntermediatePersistPeriod()).getMillis(); + } + protected void initializeExecutors() { final int maxPendingPersists = config.getMaxPendingPersists(); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index fcb8a039f85..a3bbc0ffb45 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import com.metamx.common.ISE; @@ -193,7 +194,7 @@ public class RealtimeManagerTest Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable()); Assert.assertTrue(plumber.isStartedJob()); Assert.assertTrue(plumber.isFinishedJob()); - Assert.assertEquals(1, plumber.getPersistCount()); + Assert.assertEquals(0, plumber.getPersistCount()); } @Test @@ -214,7 +215,7 @@ public class RealtimeManagerTest Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable()); Assert.assertTrue(plumber2.isStartedJob()); Assert.assertTrue(plumber2.isFinishedJob()); - Assert.assertEquals(1, plumber2.getPersistCount()); + Assert.assertEquals(0, plumber2.getPersistCount()); } private TestInputRowHolder makeRow(final long timestamp) @@ -440,7 +441,7 @@ public class RealtimeManagerTest } @Override - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { if (row == null) { return -1; @@ -470,7 +471,7 @@ public class RealtimeManagerTest } @Override - public void persist(Runnable commitRunnable) + public void persist(Committer committer) { persistCount++; } @@ -480,11 +481,5 @@ public class RealtimeManagerTest { finishedJob = true; } - - @Override - public void persist(Committer commitRunnable) - { - persistCount++; - } } } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 03f8c78437c..0cc211c7a30 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime.plumber; import com.google.common.base.Predicate; +import com.google.common.base.Suppliers; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; @@ -235,37 +236,22 @@ public class RealtimePlumberSchoolTest EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); - plumber.add(row); + final Committer committer = new Committer() + { + @Override + public Object getMetadata() + { + return commitMetadata; + } - if (commitMetadata != null) { - plumber.persist( - new Committer() - { - @Override - public Object getMetadata() - { - return commitMetadata; - } - - @Override - public void run() - { - committed.setValue(true); - } - } - ); - } else { - plumber.persist( - new Runnable() - { - @Override - public void run() - { - committed.setValue(true); - } - } - ); - } + @Override + public void run() + { + committed.setValue(true); + } + }; + plumber.add(row, Suppliers.ofInstance(committer)); + plumber.persist(committer); while (!committed.booleanValue()) { Thread.sleep(100); @@ -293,23 +279,29 @@ public class RealtimePlumberSchoolTest EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); - plumber.add(row); + plumber.add(row, Committers.supplierOf(Committers.nil())); plumber.persist( - new Runnable() - { - @Override - public void run() - { - committed.setValue(true); - throw new RuntimeException(); - } - } + Committers.supplierFromRunnable( + new Runnable() + { + @Override + public void run() + { + committed.setValue(true); + throw new RuntimeException(); + } + } + ).get() ); - while (!committed.booleanValue()) { Thread.sleep(100); } + // Exception may need time to propagate + while (metrics.failedPersists() < 1) { + Thread.sleep(100); + } + Assert.assertEquals(1, metrics.failedPersists()); } }