From 09ad32c5c563c7f8482f9a5170ad07320d7976c5 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 16 May 2014 15:15:10 -0700 Subject: [PATCH 1/3] fix race condition with merge and persist and sink adding Conflicts: indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java server/src/main/java/io/druid/segment/realtime/RealtimeManager.java server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java --- .../common/index/YeOldePlumberSchool.java | 11 +++++ .../druid/indexing/common/task/IndexTask.java | 8 ++-- .../common/task/RealtimeIndexTask.java | 9 +--- .../segment/realtime/RealtimeManager.java | 11 ++--- .../segment/realtime/plumber/Plumber.java | 2 + .../realtime/plumber/RealtimePlumber.java | 12 +++++ .../druid/segment/realtime/plumber/Sink.java | 44 ++++++++++--------- .../segment/realtime/RealtimeManagerTest.java | 15 +++++++ .../plumber/RealtimePlumberSchoolTest.java | 10 ----- .../segment/realtime/plumber/SinkTest.java | 6 +-- 10 files changed, 75 insertions(+), 53 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index a57c025eef0..11f6bb2264d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -31,6 +31,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.Granularity; import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.segment.IndexIO; @@ -112,6 +113,16 @@ public class YeOldePlumberSchool implements PlumberSchool } @Override + public int add(InputRow row) + { + Sink sink = getSink(row.getTimestampFromEpoch()); + if (sink == null) { + return -1; + } + + return sink.add(row); + } + public Sink getSink(long timestamp) { if (theSink.getInterval().contains(timestamp)) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 21b23958a2b..f406c747a70 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -398,17 +398,15 @@ public class IndexTask extends AbstractFixedIntervalTask final InputRow inputRow = firehose.nextRow(); if (shouldIndex(shardSpec, interval, inputRow)) { - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink == null) { - throw new NullPointerException( + int numRows = plumber.add(inputRow); + if (numRows == -1) { + throw new ISE( String.format( "Was expecting non-null sink for timestamp[%s]", new DateTime(inputRow.getTimestampFromEpoch()) ) ); } - - int numRows = sink.add(inputRow); metrics.incrementProcessed(); if (numRows >= myRowFlushBoundary) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 4bc4b9d9721..be81698170b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -335,8 +335,8 @@ public class RealtimeIndexTask extends AbstractTask continue; } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink == null) { + int currCount = plumber.add(inputRow); + if (currCount == -1) { fireDepartment.getMetrics().incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); @@ -348,11 +348,6 @@ public class RealtimeIndexTask extends AbstractTask continue; } - if (sink.isEmpty()) { - log.info("Task %s: New sink: %s", getId(), sink); - } - - int currCount = sink.add(inputRow); fireDepartment.getMetrics().incrementProcessed(); if (currCount >= tuningConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index daf6a4252aa..f83a1e3911f 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -43,7 +43,6 @@ import io.druid.query.SegmentDescriptor; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.Sink; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -197,8 +196,8 @@ public class RealtimeManager implements QuerySegmentWalker continue; } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink == null) { + int currCount = plumber.add(inputRow); + if (currCount == -1) { metrics.incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); @@ -209,8 +208,6 @@ public class RealtimeManager implements QuerySegmentWalker continue; } - - int currCount = sink.add(inputRow); if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); @@ -225,13 +222,11 @@ public class RealtimeManager implements QuerySegmentWalker } } catch (RuntimeException e) { - log.makeAlert( e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource() - ) - .emit(); + ).emit(); normalExit = false; throw e; } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index bce5086de65..a64680d046d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.plumber; +import io.druid.data.input.InputRow; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -30,6 +31,7 @@ public interface Plumber */ public void startJob(); + public int add(InputRow row); public Sink getSink(long timestamp); public QueryRunner getQueryRunner(Query query); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 688a620e4e6..65592008336 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -21,6 +21,7 @@ import io.druid.client.ServerView; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -72,6 +73,7 @@ public class RealtimePlumber implements Plumber private final DataSchema schema; private final RealtimeTuningConfig config; + private final RejectionPolicy rejectionPolicy; private final FireDepartmentMetrics metrics; private final ServiceEmitter emitter; @@ -151,6 +153,16 @@ public class RealtimePlumber implements Plumber } @Override + public int add(InputRow row) + { + final Sink sink = getSink(row.getTimestampFromEpoch()); + if (sink == null) { + return -1; + } + + return sink.add(row); + } + public Sink getSink(long timestamp) { if (!rejectionPolicy.accept(timestamp)) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index ab89c880879..609935d81b1 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -50,7 +50,7 @@ public class Sink implements Iterable { private static final Logger log = new Logger(Sink.class); - private volatile FireHydrant currIndex; + private volatile FireHydrant currHydrant; private final Interval interval; private final DataSchema schema; @@ -107,31 +107,35 @@ public class Sink implements Iterable return interval; } - public FireHydrant getCurrIndex() + public FireHydrant getCurrHydrant() { - return currIndex; + return currHydrant; } public int add(InputRow row) { - if (currIndex == null) { - throw new IAE("No currIndex but given row[%s]", row); + if (currHydrant == null) { + throw new IAE("No currHydrant but given row[%s]", row); } - synchronized (currIndex) { - return currIndex.getIndex().add(row); + synchronized (currHydrant) { + IncrementalIndex index = currHydrant.getIndex(); + if (index == null) { + return -1; // the hydrant was swapped without being replaced + } + return index.add(row); } } public boolean isEmpty() { - synchronized (currIndex) { - return hydrants.size() == 1 && currIndex.getIndex().isEmpty(); + synchronized (currHydrant) { + return hydrants.size() == 1 && currHydrant.getIndex().isEmpty(); } } /** - * If currIndex is A, creates a new index B, sets currIndex to B and returns A. + * If currHydrant is A, creates a new index B, sets currHydrant to B and returns A. * * @return the current index after swapping in a new one */ @@ -142,8 +146,8 @@ public class Sink implements Iterable public boolean swappable() { - synchronized (currIndex) { - return currIndex.getIndex() != null && currIndex.getIndex().size() != 0; + synchronized (currHydrant) { + return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0; } } @@ -189,15 +193,15 @@ public class Sink implements Iterable ); FireHydrant old; - if (currIndex == null) { // Only happens on initialization, cannot synchronize on null - old = currIndex; - currIndex = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); - hydrants.add(currIndex); + if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null + old = currHydrant; + currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); + hydrants.add(currHydrant); } else { - synchronized (currIndex) { - old = currIndex; - currIndex = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); - hydrants.add(currIndex); + synchronized (currHydrant) { + old = currHydrant; + currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); + hydrants.add(currHydrant); } } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 8b4301ee82a..e29dd187b4b 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -258,6 +258,21 @@ public class RealtimeManagerTest } @Override + public int add(InputRow row) + { + if (row == null) { + return -1; + } + + Sink sink = getSink(row.getTimestampFromEpoch()); + + if (sink == null) { + return -1; + } + + return sink.add(row); + } + public Sink getSink(long timestamp) { if (sink.getInterval().contains(timestamp)) { diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 3140b57512f..884baeb1054 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -161,16 +161,6 @@ public class RealtimePlumberSchoolTest EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); } - @Test - public void testGetSink() throws Exception - { - final DateTime theTime = new DateTime("2013-01-01"); - Sink sink = plumber.getSink(theTime.getMillis()); - - Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval()); - Assert.assertEquals(theTime.toString(), sink.getVersion()); - } - @Test public void testPersist() throws Exception { diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 4aa3b946068..b068a994cb7 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -101,7 +101,7 @@ public class SinkTest } ); - FireHydrant currHydrant = sink.getCurrIndex(); + FireHydrant currHydrant = sink.getCurrHydrant(); Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval()); @@ -143,8 +143,8 @@ public class SinkTest ); Assert.assertEquals(currHydrant, swapHydrant); - Assert.assertNotSame(currHydrant, sink.getCurrIndex()); - Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval()); + Assert.assertNotSame(currHydrant, sink.getCurrHydrant()); + Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrHydrant().getIndex().getInterval()); Assert.assertEquals(2, Iterators.size(sink.iterator())); } From 389319dd6e827f73f360896e102f0dbe4912954d Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 16 May 2014 15:32:58 -0700 Subject: [PATCH 2/3] fix for code review comments --- .../main/java/io/druid/segment/realtime/plumber/Plumber.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index a64680d046d..31da9599461 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -31,8 +31,11 @@ public interface Plumber */ public void startJob(); + /** + * @param row - the row to insert + * @return - indicating how many rows were added, -1 means that an error occurred + */ public int add(InputRow row); - public Sink getSink(long timestamp); public QueryRunner getQueryRunner(Query query); /** From 68618ffc7536c937cf4d840255a17e53f69e1db4 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 16 May 2014 15:38:28 -0700 Subject: [PATCH 3/3] fix another code review comment --- .../main/java/io/druid/segment/realtime/plumber/Plumber.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index 31da9599461..136d3a8a253 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -33,7 +33,8 @@ public interface Plumber /** * @param row - the row to insert - * @return - indicating how many rows were added, -1 means that an error occurred + * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, + * -1 means a row was thrown away because it was too late */ public int add(InputRow row); public QueryRunner getQueryRunner(Query query);