Account for potential gaps in hydrants in sink initialization, hydrant swapping (e.g. h0, h1, h4)

This commit is contained in:
jon-wei 2015-09-16 15:25:34 -07:00
parent 9705c5139b
commit 9f6bb03ef4
3 changed files with 199 additions and 15 deletions

View File

@ -744,6 +744,14 @@ public class RealtimePlumber implements Plumber
) )
); );
} }
if (hydrants.isEmpty()) {
// Probably encountered a corrupt sink directory
log.warn(
"Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.",
sinkDir.getAbsolutePath()
);
continue;
}
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink); sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add( sinkTimeline.add(

View File

@ -85,11 +85,13 @@ public class Sink implements Iterable<FireHydrant>
this.interval = interval; this.interval = interval;
this.version = version; this.version = version;
int maxCount = -1;
for (int i = 0; i < hydrants.size(); ++i) { for (int i = 0; i < hydrants.size(); ++i) {
final FireHydrant hydrant = hydrants.get(i); final FireHydrant hydrant = hydrants.get(i);
if (hydrant.getCount() != i) { if (hydrant.getCount() <= maxCount) {
throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i); throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i);
} }
maxCount = hydrant.getCount();
} }
this.hydrants.addAll(hydrants); this.hydrants.addAll(hydrants);
@ -167,13 +169,13 @@ public class Sink implements Iterable<FireHydrant>
Lists.<String>newArrayList(), Lists.<String>newArrayList(),
Lists.transform( Lists.transform(
Arrays.asList(schema.getAggregators()), new Function<AggregatorFactory, String>() Arrays.asList(schema.getAggregators()), new Function<AggregatorFactory, String>()
{ {
@Override @Override
public String apply(@Nullable AggregatorFactory input) public String apply(@Nullable AggregatorFactory input)
{ {
return input.getName(); return input.getName();
} }
} }
), ),
config.getShardSpec(), config.getShardSpec(),
null, null,
@ -208,7 +210,13 @@ public class Sink implements Iterable<FireHydrant>
final FireHydrant old; final FireHydrant old;
synchronized (hydrantLock) { synchronized (hydrantLock) {
old = currHydrant; old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); int newCount = 0;
int numHydrants = hydrants.size();
if (numHydrants > 0) {
FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
newCount = lastHydrant.getCount() + 1;
}
currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
hydrants.add(currHydrant); hydrants.add(currHydrant);
} }

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -29,8 +30,10 @@ import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Committer; import io.druid.data.input.Committer;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.JSONParseSpec;
@ -49,6 +52,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -69,9 +73,11 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
*/ */
@ -88,6 +94,7 @@ public class RealtimePlumberSchoolTest
private ServiceEmitter emitter; private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig; private RealtimeTuningConfig tuningConfig;
private DataSchema schema; private DataSchema schema;
private DataSchema schema2;
private FireDepartmentMetrics metrics; private FireDepartmentMetrics metrics;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
@ -134,6 +141,22 @@ public class RealtimePlumberSchoolTest
jsonMapper jsonMapper
); );
schema2 = new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
)
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.YEAR, QueryGranularity.NONE, null),
jsonMapper
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class); announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
announcer.announceSegment(EasyMock.<DataSegment>anyObject()); announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
@ -212,7 +235,7 @@ public class RealtimePlumberSchoolTest
private void testPersist(final Object commitMetadata) throws Exception private void testPersist(final Object commitMetadata) throws Exception
{ {
final MutableBoolean committed = new MutableBoolean(false); final AtomicBoolean committed = new AtomicBoolean(false);
plumber.getSinks() plumber.getSinks()
.put( .put(
0L, 0L,
@ -240,13 +263,13 @@ public class RealtimePlumberSchoolTest
@Override @Override
public void run() public void run()
{ {
committed.setValue(true); committed.set(true);
} }
}; };
plumber.add(row, Suppliers.ofInstance(committer)); plumber.add(row, Suppliers.ofInstance(committer));
plumber.persist(committer); plumber.persist(committer);
while (!committed.booleanValue()) { while (!committed.get()) {
Thread.sleep(100); Thread.sleep(100);
} }
plumber.getSinks().clear(); plumber.getSinks().clear();
@ -256,7 +279,7 @@ public class RealtimePlumberSchoolTest
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPersistFails() throws Exception public void testPersistFails() throws Exception
{ {
final MutableBoolean committed = new MutableBoolean(false); final AtomicBoolean committed = new AtomicBoolean(false);
plumber.getSinks() plumber.getSinks()
.put( .put(
0L, 0L,
@ -280,13 +303,13 @@ public class RealtimePlumberSchoolTest
@Override @Override
public void run() public void run()
{ {
committed.setValue(true); committed.set(true);
throw new RuntimeException(); throw new RuntimeException();
} }
} }
).get() ).get()
); );
while (!committed.booleanValue()) { while (!committed.get()) {
Thread.sleep(100); Thread.sleep(100);
} }
@ -297,4 +320,149 @@ public class RealtimePlumberSchoolTest
Assert.assertEquals(1, metrics.failedPersists()); Assert.assertEquals(1, metrics.failedPersists());
} }
@Test(timeout = 60000)
public void testPersistHydrantGaps() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testPersistHydrantGapsHelper(commitMetadata);
}
private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception
{
final AtomicBoolean committed = new AtomicBoolean(false);
Interval testInterval = new Interval(new DateTime("1970-01-01"), new DateTime("1971-01-01"));
RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
plumber2.getSinks()
.put(
0L,
new Sink(
testInterval,
schema2,
tuningConfig,
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
Assert.assertNull(plumber2.startJob());
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
committed.set(true);
}
};
plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer));
plumber2.persist(committer);
while (!committed.get()) {
Thread.sleep(100);
}
plumber2.getSinks().clear();
plumber2.finishJob();
File persistDir = plumber2.computePersistDir(schema2, testInterval);
/* Check that all hydrants were persisted */
for (int i = 0; i < 5; i ++) {
Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists());
}
/* Create some gaps in the persisted hydrants and reload */
FileUtils.deleteDirectory(new File(persistDir, "1"));
FileUtils.deleteDirectory(new File(persistDir, "3"));
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
restoredPlumber.bootstrapSinksFromDisk();
Map<Long, Sink> sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(new Long(0)));
DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z");
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")),
hydrants.get(0).getSegment().getDataInterval());
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")),
hydrants.get(1).getSegment().getDataInterval());
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")),
hydrants.get(2).getSegment().getDataInterval());
/* Delete all the hydrants and reload, no sink should be created */
FileUtils.deleteDirectory(new File(persistDir, "0"));
FileUtils.deleteDirectory(new File(persistDir, "2"));
FileUtils.deleteDirectory(new File(persistDir, "4"));
RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
restoredPlumber2.bootstrapSinksFromDisk();
Assert.assertEquals(0, restoredPlumber2.getSinks().size());
}
private InputRow getTestInputRow(final String timeStr) {
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime(timeStr).getMillis();
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timeStr);
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}
} }