From 9f6bb03ef49c3cd0ad9e62f3ec2996f69cf5143f Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 16 Sep 2015 15:25:34 -0700 Subject: [PATCH] Account for potential gaps in hydrants in sink initialization, hydrant swapping (e.g. h0, h1, h4) --- .../realtime/plumber/RealtimePlumber.java | 8 + .../druid/segment/realtime/plumber/Sink.java | 26 ++- .../plumber/RealtimePlumberSchoolTest.java | 180 +++++++++++++++++- 3 files changed, 199 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 0adca56b2b0..00a89969e6f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -744,6 +744,14 @@ public class RealtimePlumber implements Plumber ) ); } + if (hydrants.isEmpty()) { + // Probably encountered a corrupt sink directory + log.warn( + "Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", + sinkDir.getAbsolutePath() + ); + continue; + } Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); sinks.put(sinkInterval.getStartMillis(), currSink); sinkTimeline.add( diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index b3269a4f589..fceee9aaff5 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -85,11 +85,13 @@ public class Sink implements Iterable this.interval = interval; this.version = version; + int maxCount = -1; for (int i = 0; i < hydrants.size(); ++i) { final FireHydrant hydrant = hydrants.get(i); - if (hydrant.getCount() != i) { + if (hydrant.getCount() <= maxCount) { throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i); } + maxCount = hydrant.getCount(); } this.hydrants.addAll(hydrants); @@ -167,13 +169,13 @@ public class Sink implements Iterable Lists.newArrayList(), Lists.transform( Arrays.asList(schema.getAggregators()), new Function() - { - @Override - public String apply(@Nullable AggregatorFactory input) - { - return input.getName(); - } - } + { + @Override + public String apply(@Nullable AggregatorFactory input) + { + return input.getName(); + } + } ), config.getShardSpec(), null, @@ -208,7 +210,13 @@ public class Sink implements Iterable final FireHydrant old; synchronized (hydrantLock) { old = currHydrant; - currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); + int newCount = 0; + int numHydrants = hydrants.size(); + if (numHydrants > 0) { + FireHydrant lastHydrant = hydrants.get(numHydrants - 1); + newCount = lastHydrant.getCount() + 1; + } + currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); hydrants.add(currHydrant); } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 5f77f3d4907..d298249cbf3 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; import com.google.common.base.Suppliers; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; @@ -29,8 +30,10 @@ import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.common.utils.JodaUtils; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.JSONParseSpec; @@ -49,6 +52,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; @@ -69,9 +73,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** */ @@ -88,6 +94,7 @@ public class RealtimePlumberSchoolTest private ServiceEmitter emitter; private RealtimeTuningConfig tuningConfig; private DataSchema schema; + private DataSchema schema2; private FireDepartmentMetrics metrics; public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) @@ -134,6 +141,22 @@ public class RealtimePlumberSchoolTest jsonMapper ); + schema2 = new DataSchema( + "test", + jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(null, null, null) + ) + ), + Map.class + ), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec(Granularity.YEAR, QueryGranularity.NONE, null), + jsonMapper + ); + announcer = EasyMock.createMock(DataSegmentAnnouncer.class); announcer.announceSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); @@ -212,7 +235,7 @@ public class RealtimePlumberSchoolTest private void testPersist(final Object commitMetadata) throws Exception { - final MutableBoolean committed = new MutableBoolean(false); + final AtomicBoolean committed = new AtomicBoolean(false); plumber.getSinks() .put( 0L, @@ -240,13 +263,13 @@ public class RealtimePlumberSchoolTest @Override public void run() { - committed.setValue(true); + committed.set(true); } }; plumber.add(row, Suppliers.ofInstance(committer)); plumber.persist(committer); - while (!committed.booleanValue()) { + while (!committed.get()) { Thread.sleep(100); } plumber.getSinks().clear(); @@ -256,7 +279,7 @@ public class RealtimePlumberSchoolTest @Test(timeout = 60000) public void testPersistFails() throws Exception { - final MutableBoolean committed = new MutableBoolean(false); + final AtomicBoolean committed = new AtomicBoolean(false); plumber.getSinks() .put( 0L, @@ -280,13 +303,13 @@ public class RealtimePlumberSchoolTest @Override public void run() { - committed.setValue(true); + committed.set(true); throw new RuntimeException(); } } ).get() ); - while (!committed.booleanValue()) { + while (!committed.get()) { Thread.sleep(100); } @@ -297,4 +320,149 @@ public class RealtimePlumberSchoolTest Assert.assertEquals(1, metrics.failedPersists()); } + + @Test(timeout = 60000) + public void testPersistHydrantGaps() throws Exception + { + final Object commitMetadata = "dummyCommitMetadata"; + testPersistHydrantGapsHelper(commitMetadata); + } + + private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception + { + final AtomicBoolean committed = new AtomicBoolean(false); + Interval testInterval = new Interval(new DateTime("1970-01-01"), new DateTime("1971-01-01")); + + RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + plumber2.getSinks() + .put( + 0L, + new Sink( + testInterval, + schema2, + tuningConfig, + new DateTime("2014-12-01T12:34:56.789").toString() + ) + ); + Assert.assertNull(plumber2.startJob()); + + final Committer committer = new Committer() + { + @Override + public Object getMetadata() + { + return commitMetadata; + } + + @Override + public void run() + { + committed.set(true); + } + }; + plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer)); + + plumber2.persist(committer); + + while (!committed.get()) { + Thread.sleep(100); + } + plumber2.getSinks().clear(); + plumber2.finishJob(); + + File persistDir = plumber2.computePersistDir(schema2, testInterval); + + /* Check that all hydrants were persisted */ + for (int i = 0; i < 5; i ++) { + Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists()); + } + + /* Create some gaps in the persisted hydrants and reload */ + FileUtils.deleteDirectory(new File(persistDir, "1")); + FileUtils.deleteDirectory(new File(persistDir, "3")); + RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + restoredPlumber.bootstrapSinksFromDisk(); + + Map sinks = restoredPlumber.getSinks(); + Assert.assertEquals(1, sinks.size()); + + List hydrants = Lists.newArrayList(sinks.get(new Long(0))); + DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z"); + Assert.assertEquals(0, hydrants.get(0).getCount()); + Assert.assertEquals(new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")), + hydrants.get(0).getSegment().getDataInterval()); + Assert.assertEquals(2, hydrants.get(1).getCount()); + Assert.assertEquals(new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")), + hydrants.get(1).getSegment().getDataInterval()); + Assert.assertEquals(4, hydrants.get(2).getCount()); + Assert.assertEquals(new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")), + hydrants.get(2).getSegment().getDataInterval()); + + /* Delete all the hydrants and reload, no sink should be created */ + FileUtils.deleteDirectory(new File(persistDir, "0")); + FileUtils.deleteDirectory(new File(persistDir, "2")); + FileUtils.deleteDirectory(new File(persistDir, "4")); + RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + restoredPlumber2.bootstrapSinksFromDisk(); + + Assert.assertEquals(0, restoredPlumber2.getSinks().size()); + } + + private InputRow getTestInputRow(final String timeStr) { + return new InputRow() + { + @Override + public List getDimensions() + { + return Lists.newArrayList(); + } + + @Override + public long getTimestampFromEpoch() + { + return new DateTime(timeStr).getMillis(); + } + + @Override + public DateTime getTimestamp() + { + return new DateTime(timeStr); + } + + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } + + @Override + public float getFloatMetric(String metric) + { + return 0; + } + + @Override + public long getLongMetric(String metric) + { + return 0L; + } + + @Override + public Object getRaw(String dimension) + { + return null; + } + + @Override + public int compareTo(Row o) + { + return 0; + } + }; + } + }