From c12c16385ea97189e014dd4a8beb3b6b75ab5168 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Sat, 5 May 2018 01:12:25 +0800 Subject: [PATCH] support throw duplcate row during realtime ingestion in RealtimePlumber (#5693) --- docs/content/ingestion/stream-pull.md | 1 + docs/content/operations/metrics.md | 1 + .../src/main/resources/defaultMetrics.json | 3 + .../resources/defaultMetricDimensions.json | 1 + .../common/index/YeOldePlumberSchool.java | 12 +-- .../common/task/RealtimeIndexTaskTest.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 1 + .../IncrementalIndexAddResult.java | 25 +++++- .../indexing/RealtimeTuningConfig.java | 23 ++++- .../realtime/FireDepartmentMetrics.java | 13 +++ .../segment/realtime/RealtimeManager.java | 12 ++- .../realtime/RealtimeMetricsMonitor.java | 6 ++ .../appenderator/AppenderatorImpl.java | 4 +- .../appenderator/AppenderatorPlumber.java | 17 ++-- .../segment/realtime/plumber/Plumber.java | 11 ++- .../segment/realtime/plumber/Plumbers.java | 15 +++- .../realtime/plumber/RealtimePlumber.java | 42 ++++++--- .../druid/segment/realtime/plumber/Sink.java | 62 ++++++++++++- .../segment/realtime/RealtimeManagerTest.java | 18 ++-- .../appenderator/AppenderatorPlumberTest.java | 7 +- .../appenderator/AppenderatorTester.java | 1 + ...DefaultOfflineAppenderatorFactoryTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 10 ++- .../segment/realtime/plumber/SinkTest.java | 90 ++++++++++++++++++- .../cli/validate/DruidJsonValidatorTest.java | 1 + 26 files changed, 320 insertions(+), 59 deletions(-) diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md index 1bd33caad82..9fbd00d895f 100644 --- a/docs/content/ingestion/stream-pull.md +++ b/docs/content/ingestion/stream-pull.md @@ -156,6 +156,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |alertTimeout|long|Milliseconds timeout after which an alert is created if the task isn't finished by then. This allows users to monitor tasks that are failing to finish and give up the worker slot for any unexpected errors.|no (default == 0)| |segmentWriteOutMediumFactory|String|Segment write-out medium to use when creating segments. See [Indexing Service Configuration](../configuration/indexing-service.html) page, "SegmentWriteOutMediumFactory" section for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)| +|dedupColumn|String|the column to judge whether this row is already in this segment, if so, throw away this row. If it is String type column, to reduce heap cost, use long type hashcode of this column's value to judge whether this row is already ingested, so there maybe very small chance to throw away a row that is not ingested before.|no (default == null)| |indexSpec|Object|Tune how data is indexed. See below for more information.|no| Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`. diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index 31033cedd58..95e505d73d0 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -107,6 +107,7 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th |------|-----------|----------|------------| |`ingest/events/thrownAway`|Number of events rejected because they are outside the windowPeriod.|dataSource.|0| |`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|dataSource.|0| +|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|dataSource.|0| |`ingest/events/processed`|Number of events successfully processed per emission period.|dataSource.|Equal to your # of events per emission period.| |`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.| |`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.| diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json index f73541c17f3..af183b58f67 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json @@ -54,6 +54,9 @@ "ingest/events/unparseable": [ "dataSource" ], + "ingest/events/duplicate": [ + "dataSource" + ], "ingest/events/processed": [ "dataSource" ], diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 1f9d6d63190..b1026e45036 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -32,6 +32,7 @@ "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" }, + "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/persist/count" : { "dimensions" : ["dataSource"], "type" : "count" }, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 91d8e6b133b..8379bea7e9d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -40,6 +40,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; +import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -107,7 +108,8 @@ public class YeOldePlumberSchool implements PlumberSchool version, config.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), - config.isReportParseExceptions() + config.isReportParseExceptions(), + config.getDedupColumn() ); // Temporary directory to hold spilled segments. @@ -125,20 +127,20 @@ public class YeOldePlumberSchool implements PlumberSchool } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public IncrementalIndexAddResult add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { - return -1; + return Plumber.THROWAWAY; } - final int numRows = sink.add(row, false).getRowCount(); + final IncrementalIndexAddResult addResult = sink.add(row, false); if (!sink.canAppendRow()) { persist(committerSupplier.get()); } - return numRows; + return addResult; } private Sink getSink(long timestamp) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9d02ed1c63e..7b128dd9bf7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -919,6 +919,7 @@ public class RealtimeIndexTaskTest reportParseExceptions, handoffTimeout, null, + null, null ); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 545bb9ec08e..15af1eb8e83 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -554,6 +554,7 @@ public class TaskSerdeTest true, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 091dff01dba..ae847a597b6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -1278,6 +1278,7 @@ public class TaskLifecycleTest null, null, null, + null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java index e76d3c15a1e..94384234bb3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java @@ -30,6 +30,21 @@ public class IncrementalIndexAddResult @Nullable private final ParseException parseException; + @Nullable + private String reasonOfNotAdded; + + public IncrementalIndexAddResult( + int rowCount, + long bytesInMemory, + @Nullable ParseException parseException, + @Nullable String reasonOfNotAdded + ) + { + this.rowCount = rowCount; + this.bytesInMemory = bytesInMemory; + this.parseException = parseException; + this.reasonOfNotAdded = reasonOfNotAdded; + } public IncrementalIndexAddResult( int rowCount, @@ -37,9 +52,7 @@ public class IncrementalIndexAddResult @Nullable ParseException parseException ) { - this.rowCount = rowCount; - this.bytesInMemory = bytesInMemory; - this.parseException = parseException; + this(rowCount, bytesInMemory, parseException, null); } public int getRowCount() @@ -57,4 +70,10 @@ public class IncrementalIndexAddResult { return parseException; } + + @Nullable + public String getReasonOfNotAdded() + { + return reasonOfNotAdded; + } } diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index f3f3ae6a1d9..5a018eeac2c 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -53,6 +53,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private static final Boolean defaultReportParseExceptions = Boolean.FALSE; private static final long defaultHandoffConditionTimeout = 0; private static final long defaultAlertTimeout = 0; + private static final String defaultDedupColumn = null; private static File createNewBasePersistDirectory() { @@ -87,7 +88,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig defaultReportParseExceptions, defaultHandoffConditionTimeout, defaultAlertTimeout, - null + null, + defaultDedupColumn ); } @@ -108,6 +110,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private final long alertTimeout; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + @Nullable + private final String dedupColumn; @JsonCreator public RealtimeTuningConfig( @@ -128,7 +132,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("alertTimeout") Long alertTimeout, - @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("dedupColumn") @Nullable String dedupColumn ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -160,6 +165,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + this.dedupColumn = dedupColumn == null ? defaultDedupColumn : dedupColumn; } @Override @@ -276,6 +282,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig return segmentWriteOutMediumFactory; } + @JsonProperty + @Nullable + public String getDedupColumn() + { + return dedupColumn; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -295,7 +308,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig reportParseExceptions, handoffConditionTimeout, alertTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + dedupColumn ); } @@ -318,7 +332,8 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig reportParseExceptions, handoffConditionTimeout, alertTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + dedupColumn ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java index 302b58663ce..77e06ef2cd8 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java @@ -31,6 +31,7 @@ public class FireDepartmentMetrics private final AtomicLong processedWithErrorsCount = new AtomicLong(0); private final AtomicLong thrownAwayCount = new AtomicLong(0); private final AtomicLong unparseableCount = new AtomicLong(0); + private final AtomicLong dedupCount = new AtomicLong(0); private final AtomicLong rowOutputCount = new AtomicLong(0); private final AtomicLong numPersists = new AtomicLong(0); private final AtomicLong persistTimeMillis = new AtomicLong(0); @@ -60,6 +61,11 @@ public class FireDepartmentMetrics thrownAwayCount.incrementAndGet(); } + public void incrementDedup() + { + dedupCount.incrementAndGet(); + } + public void incrementUnparseable() { unparseableCount.incrementAndGet(); @@ -145,6 +151,11 @@ public class FireDepartmentMetrics return unparseableCount.get(); } + public long dedup() + { + return dedupCount.get(); + } + public long rowOutput() { return rowOutputCount.get(); @@ -217,6 +228,7 @@ public class FireDepartmentMetrics retVal.processedWithErrorsCount.set(processedWithErrorsCount.get()); retVal.thrownAwayCount.set(thrownAwayCount.get()); retVal.unparseableCount.set(unparseableCount.get()); + retVal.dedupCount.set(dedupCount.get()); retVal.rowOutputCount.set(rowOutputCount.get()); retVal.numPersists.set(numPersists.get()); retVal.persistTimeMillis.set(persistTimeMillis.get()); @@ -247,6 +259,7 @@ public class FireDepartmentMetrics thrownAwayCount.addAndGet(otherSnapshot.thrownAway()); rowOutputCount.addAndGet(otherSnapshot.rowOutput()); unparseableCount.addAndGet(otherSnapshot.unparseable()); + dedupCount.addAndGet(otherSnapshot.dedup()); numPersists.addAndGet(otherSnapshot.numPersists()); persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis()); persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis()); diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 989f50012b9..5621385a34a 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -49,6 +49,7 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; import io.druid.query.spec.SpecificSegmentSpec; +import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Committers; @@ -335,14 +336,17 @@ public class RealtimeManager implements QuerySegmentWalker return false; } InputRow inputRow = null; - int numRows = 0; try { inputRow = firehose.currRow(); if (inputRow != null) { - numRows = plumber.add(inputRow, committerSupplier); - if (numRows < 0) { + IncrementalIndexAddResult addResult = plumber.add(inputRow, committerSupplier); + int numRows = addResult.getRowCount(); + if (numRows == -2) { + metrics.incrementDedup(); + log.debug("Throwing away duplicate event[%s]", inputRow); + } else if (numRows < 0) { metrics.incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); + log.debug("Throwing away event[%s] due to %s", inputRow, addResult.getReasonOfNotAdded()); } else { metrics.incrementProcessed(); } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index f65ee0627c7..38f82326792 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -80,6 +80,12 @@ public class RealtimeMetricsMonitor extends AbstractMonitor log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable); } emitter.emit(builder.build("ingest/events/unparseable", unparseable)); + final long dedup = metrics.dedup() - previous.dedup(); + if (dedup > 0) { + log.warn("[%,d] duplicate events!", dedup); + } + emitter.emit(builder.build("ingest/events/duplicate", dedup)); + emitter.emit(builder.build("ingest/events/processed", metrics.processed() - previous.processed())); emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput())); emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists())); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index cd2ecd9ac69..e5c2954f2d6 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -355,7 +355,8 @@ public class AppenderatorImpl implements Appenderator identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + null ); try { @@ -1027,6 +1028,7 @@ public class AppenderatorImpl implements Appenderator tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions(), + null, hydrants ); sinks.put(identifier, currSink); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java index a3683d0950a..6e852a74791 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -29,20 +29,21 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.common.guava.ThreadRenamingCallable; -import io.druid.java.util.common.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.Query; import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; +import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -145,23 +146,21 @@ public class AppenderatorPlumber implements Plumber } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public IncrementalIndexAddResult add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { final SegmentIdentifier identifier = getSegmentIdentifier(row.getTimestampFromEpoch()); if (identifier == null) { - return -1; + return Plumber.THROWAWAY; } - final int numRows; - try { - numRows = appenderator.add(identifier, row, committerSupplier).getNumRowsInSegment(); + final Appenderator.AppenderatorAddResult addResult = appenderator.add(identifier, row, committerSupplier); lastCommitterSupplier = committerSupplier; - return numRows; + return new IncrementalIndexAddResult(addResult.getNumRowsInSegment(), 0, addResult.getParseException()); } catch (SegmentNotWritableException e) { // Segment already started handoff - return -1; + return Plumber.NOT_WRITABLE; } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index d9ff2342d3b..5cf43535a5a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -24,10 +24,15 @@ import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; public interface Plumber { + IncrementalIndexAddResult THROWAWAY = new IncrementalIndexAddResult(-1, -1, null, "row too late"); + IncrementalIndexAddResult NOT_WRITABLE = new IncrementalIndexAddResult(-1, -1, null, "not writable"); + IncrementalIndexAddResult DUPLICATE = new IncrementalIndexAddResult(-2, -1, null, "duplicate row"); + /** * Perform any initial setup. Should be called before using any other methods, and should be paired * with a corresponding call to {@link #finishJob}. @@ -40,10 +45,12 @@ public interface Plumber * @param row the row to insert * @param committerSupplier supplier of a committer associated with all data that has been added, including this row * - * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, + * @return IncrementalIndexAddResult whose rowCount + * - positive numbers indicate how many summarized rows exist in the index for that timestamp, * -1 means a row was thrown away because it was too late + * -2 means a row was thrown away because it is duplicate */ - int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException; + IncrementalIndexAddResult add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException; QueryRunner getQueryRunner(Query query); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java index 5826e5f38e3..ba162c45030 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java @@ -26,6 +26,7 @@ import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; +import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -66,9 +67,9 @@ public class Plumbers return; } - final int numRows; + final IncrementalIndexAddResult addResult; try { - numRows = plumber.add(inputRow, committerSupplier); + addResult = plumber.add(inputRow, committerSupplier); } catch (IndexSizeExceededException e) { // Shouldn't happen if this is only being called by a single thread. @@ -76,9 +77,15 @@ public class Plumbers throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!"); } - if (numRows == -1) { + if (addResult.getRowCount() == -1) { metrics.incrementThrownAway(); - log.debug("Discarded row[%s], considering thrownAway.", inputRow); + log.debug("Discarded row[%s], considering thrownAway due to %s.", inputRow, addResult.getReasonOfNotAdded()); + return; + } + + if (addResult.getRowCount() == -2) { + metrics.incrementDedup(); + log.debug("Discarded row[%s], considering duplication.", inputRow); return; } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 0521a502f50..c2b60e63a1d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -30,14 +30,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.utils.VMUtils; -import io.druid.java.util.common.concurrent.Execs; import io.druid.concurrent.TaskThreadPriority; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -46,9 +43,12 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.io.Closer; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -209,13 +209,13 @@ public class RealtimePlumber implements Plumber } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public IncrementalIndexAddResult add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { long messageTimestamp = row.getTimestampFromEpoch(); final Sink sink = getSink(messageTimestamp); metrics.reportMessageMaxTimestamp(messageTimestamp); if (sink == null) { - return -1; + return Plumber.THROWAWAY; } final IncrementalIndexAddResult addResult = sink.add(row, false); @@ -227,7 +227,7 @@ public class RealtimePlumber implements Plumber persist(committerSupplier.get()); } - return addResult.getRowCount(); + return addResult; } private Sink getSink(long timestamp) @@ -257,7 +257,8 @@ public class RealtimePlumber implements Plumber versioningPolicy.getVersion(sinkInterval), config.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), - config.isReportParseExceptions() + config.isReportParseExceptions(), + config.getDedupColumn() ); addSink(retVal); @@ -508,6 +509,7 @@ public class RealtimePlumber implements Plumber shuttingDown = true; for (final Map.Entry entry : sinks.entrySet()) { + entry.getValue().clearDedupCache(); persistAndMerge(entry.getKey(), entry.getValue()); } @@ -733,6 +735,7 @@ public class RealtimePlumber implements Plumber config.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions(), + config.getDedupColumn(), hydrants ); addSink(currSink); @@ -757,6 +760,7 @@ public class RealtimePlumber implements Plumber .addData("interval", sink.getInterval()) .emit(); } + clearDedupCache(); } protected void startPersistThread() @@ -811,16 +815,33 @@ public class RealtimePlumber implements Plumber ScheduledExecutors.scheduleAtFixedRate(scheduledExecutor, initialDelay, rate, threadRenamingCallable); } - private void mergeAndPush() + private void clearDedupCache() + { + long minTimestamp = getAllowedMinTime().getMillis(); + + for (Map.Entry entry : sinks.entrySet()) { + final Long intervalStart = entry.getKey(); + if (intervalStart < minTimestamp) { + entry.getValue().clearDedupCache(); + } + } + } + + private DateTime getAllowedMinTime() { final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); final Period windowPeriod = config.getWindowPeriod(); final long windowMillis = windowPeriod.toStandardDuration().getMillis(); - log.info("Starting merge and push."); - DateTime minTimestampAsDate = segmentGranularity.bucketStart( + return segmentGranularity.bucketStart( DateTimes.utc(Math.max(windowMillis, rejectionPolicy.getCurrMaxTime().getMillis()) - windowMillis) ); + } + + private void mergeAndPush() + { + log.info("Starting merge and push."); + DateTime minTimestampAsDate = getAllowedMinTime(); long minTimestamp = minTimestampAsDate.getMillis(); log.info( @@ -835,6 +856,7 @@ public class RealtimePlumber implements Plumber if (intervalStart < minTimestamp) { log.info("Adding entry [%s] for merge and push.", entry); sinksToPush.add(entry); + entry.getValue().clearDedupCache(); } else { log.info( "Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 50d0cf2a128..9ee983f8c8d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -45,16 +45,18 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; public class Sink implements Iterable { - private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, -1, null); + private static final IncrementalIndexAddResult ALREADY_SWAPPED = new IncrementalIndexAddResult(-1, -1, null, "write after index swapped"); private final Object hydrantLock = new Object(); private final Interval interval; @@ -69,6 +71,8 @@ public class Sink implements Iterable private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger(); private volatile FireHydrant currHydrant; private volatile boolean writable = true; + private final String dedupColumn; + private final Set dedupSet = new HashSet<>(); public Sink( Interval interval, @@ -77,7 +81,8 @@ public class Sink implements Iterable String version, int maxRowsInMemory, long maxBytesInMemory, - boolean reportParseExceptions + boolean reportParseExceptions, + String dedupColumn ) { this.schema = schema; @@ -87,6 +92,7 @@ public class Sink implements Iterable this.maxRowsInMemory = maxRowsInMemory; this.maxBytesInMemory = maxBytesInMemory; this.reportParseExceptions = reportParseExceptions; + this.dedupColumn = dedupColumn; makeNewCurrIndex(interval.getStartMillis(), schema); } @@ -99,6 +105,7 @@ public class Sink implements Iterable int maxRowsInMemory, long maxBytesInMemory, boolean reportParseExceptions, + String dedupColumn, List hydrants ) { @@ -109,6 +116,7 @@ public class Sink implements Iterable this.maxRowsInMemory = maxRowsInMemory; this.maxBytesInMemory = maxBytesInMemory; this.reportParseExceptions = reportParseExceptions; + this.dedupColumn = dedupColumn; int maxCount = -1; for (int i = 0; i < hydrants.size(); ++i) { @@ -130,6 +138,11 @@ public class Sink implements Iterable makeNewCurrIndex(interval.getStartMillis(), schema); } + public void clearDedupCache() + { + dedupSet.clear(); + } + public String getVersion() { return version; @@ -153,13 +166,18 @@ public class Sink implements Iterable synchronized (hydrantLock) { if (!writable) { - return ADD_FAILED; + return Plumber.NOT_WRITABLE; } IncrementalIndex index = currHydrant.getIndex(); if (index == null) { - return ADD_FAILED; // the hydrant was swapped without being replaced + return ALREADY_SWAPPED; // the hydrant was swapped without being replaced } + + if (checkInDedupSet(row)) { + return Plumber.DUPLICATE; + } + return index.add(row, skipMaxRowsInMemoryCheck); } } @@ -209,6 +227,7 @@ public class Sink implements Iterable { synchronized (hydrantLock) { writable = false; + clearDedupCache(); } } @@ -267,6 +286,41 @@ public class Sink implements Iterable } } + private boolean checkInDedupSet(InputRow row) + { + if (dedupColumn != null) { + Object value = row.getRaw(dedupColumn); + if (value != null) { + if (value instanceof List) { + throw new IAE("Dedup on multi-value field not support"); + } + Long pk; + if (value instanceof Long || value instanceof Integer) { + pk = ((Number) value).longValue(); + } else { + // use long type hashcode to reduce heap cost. + // maybe hash collision, but it's more important to avoid OOM + pk = pkHash(String.valueOf(value)); + } + if (dedupSet.contains(pk)) { + return true; + } + dedupSet.add(pk); + } + } + return false; + } + + private long pkHash(String s) + { + long seed = 131; // 31 131 1313 13131 131313 etc.. BKDRHash + long hash = 0; + for (int i = 0; i < s.length(); i++) { + hash = (hash * seed) + s.charAt(i); + } + return hash; + } + private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) { final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 665ce3311fa..25533ac4d1e 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -65,6 +65,7 @@ import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.TestHelper; +import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; @@ -215,6 +216,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); plumber = new TestPlumber(new Sink( @@ -224,7 +226,8 @@ public class RealtimeManagerTest DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() )); realtimeManager = new RealtimeManager( @@ -245,7 +248,8 @@ public class RealtimeManagerTest DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() )); realtimeManager2 = new RealtimeManager( @@ -277,6 +281,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); @@ -297,6 +302,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); @@ -1034,19 +1040,19 @@ public class RealtimeManagerTest } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public IncrementalIndexAddResult add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException { if (row == null) { - return -1; + return Plumber.THROWAWAY; } Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { - return -1; + return Plumber.THROWAWAY; } - return sink.add(row, false).getRowCount(); + return sink.add(row, false); } public Sink getSink(long timestamp) diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 32dcb76b0c8..efeaa7e717f 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -80,6 +80,7 @@ public class AppenderatorPlumberTest false, null, null, + null, null ); @@ -111,17 +112,17 @@ public class AppenderatorPlumberTest commitMetadata.put("x", "1"); Assert.assertEquals( 1, - plumber.add(rows[0], null)); + plumber.add(rows[0], null).getRowCount()); commitMetadata.put("x", "2"); Assert.assertEquals( 2, - plumber.add(rows[1], null)); + plumber.add(rows[1], null).getRowCount()); commitMetadata.put("x", "3"); Assert.assertEquals( 3, - plumber.add(rows[2], null)); + plumber.add(rows[2], null).getRowCount()); Assert.assertEquals(1, plumber.getSegmentsView().size()); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index e562e18f191..057eceb90e7 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -159,6 +159,7 @@ public class AppenderatorTester implements AutoCloseable null, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index b837bf5ee45..fec7afdaba4 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -149,6 +149,7 @@ public class DefaultOfflineAppenderatorFactoryTest null, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 10e5bd1085a..5dfc753f686 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -212,6 +212,7 @@ public class RealtimePlumberSchoolTest false, null, null, + null, null ); @@ -272,7 +273,8 @@ public class RealtimePlumberSchoolTest DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() ); plumber.getSinks().put(0L, sink); Assert.assertNull(plumber.startJob()); @@ -317,7 +319,8 @@ public class RealtimePlumberSchoolTest DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() ); plumber.getSinks().put(0L, sink); plumber.startJob(); @@ -372,7 +375,8 @@ public class RealtimePlumberSchoolTest DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() ); plumber2.getSinks().put(0L, sink); Assert.assertNull(plumber2.startJob()); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index a8bf1be421e..5e624c5890a 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -19,9 +19,12 @@ package io.druid.segment.realtime.plumber; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; @@ -77,6 +80,7 @@ public class SinkTest null, null, null, + null, null ); final Sink sink = new Sink( @@ -86,7 +90,8 @@ public class SinkTest version, tuningConfig.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), - tuningConfig.isReportParseExceptions() + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() ); sink.add( @@ -197,4 +202,87 @@ public class SinkTest Assert.assertEquals(2, Iterators.size(sink.iterator())); } + + @Test + public void testDedup() throws Exception + { + final DataSchema schema = new DataSchema( + "test", + null, + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null), + null, + new DefaultObjectMapper() + ); + + final Interval interval = Intervals.of("2013-01-01/2013-01-02"); + final String version = DateTimes.nowUtc().toString(); + RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + 100, + null, + new Period("P1Y"), + null, + null, + null, + null, + null, + null, + null, + null, + 0, + 0, + null, + null, + null, + null, + "dedupColumn" + ); + final Sink sink = new Sink( + interval, + schema, + tuningConfig.getShardSpec(), + version, + tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + tuningConfig.isReportParseExceptions(), + tuningConfig.getDedupColumn() + ); + + int rows = sink.add(new MapBasedInputRow( + DateTimes.of("2013-01-01"), + ImmutableList.of("field", "dedupColumn"), + ImmutableMap.of("field1", "value1", "dedupColumn", "v1") + ), false).getRowCount(); + Assert.assertTrue(rows > 0); + + // dedupColumn is null + rows = sink.add(new MapBasedInputRow( + DateTimes.of("2013-01-01"), + ImmutableList.of("field", "dedupColumn"), + ImmutableMap.of("field1", "value2") + ), false).getRowCount(); + Assert.assertTrue(rows > 0); + + // dedupColumn is null + rows = sink.add(new MapBasedInputRow( + DateTimes.of("2013-01-01"), + ImmutableList.of("field", "dedupColumn"), + ImmutableMap.of("field1", "value3") + ), false).getRowCount(); + Assert.assertTrue(rows > 0); + + rows = sink.add(new MapBasedInputRow( + DateTimes.of("2013-01-01"), + ImmutableList.of("field", "dedupColumn"), + ImmutableMap.of("field1", "value4", "dedupColumn", "v2") + ), false).getRowCount(); + Assert.assertTrue(rows > 0); + + rows = sink.add(new MapBasedInputRow( + DateTimes.of("2013-01-01"), + ImmutableList.of("field", "dedupColumn"), + ImmutableMap.of("field1", "value5", "dedupColumn", "v1") + ), false).getRowCount(); + Assert.assertTrue(rows == -2); + } } diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 9417e941290..181719cebd2 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -184,6 +184,7 @@ public class DruidJsonValidatorTest true, null, null, + null, null ) ),