Merge pull request #1747 from jon-wei/realtime_corrupt_sink_fix

Account for potential gaps in hydrants in sink initialization, hydrant swapping (e.g. h0, h1, h4)
This commit is contained in:
Fangjin Yang 2015-09-18 15:22:50 -07:00
commit 6605a60ef0
3 changed files with 199 additions and 15 deletions

View File

@ -744,6 +744,14 @@ public class RealtimePlumber implements Plumber
)
);
}
if (hydrants.isEmpty()) {
// Probably encountered a corrupt sink directory
log.warn(
"Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.",
sinkDir.getAbsolutePath()
);
continue;
}
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(

View File

@ -85,11 +85,13 @@ public class Sink implements Iterable<FireHydrant>
this.interval = interval;
this.version = version;
int maxCount = -1;
for (int i = 0; i < hydrants.size(); ++i) {
final FireHydrant hydrant = hydrants.get(i);
if (hydrant.getCount() != i) {
if (hydrant.getCount() <= maxCount) {
throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i);
}
maxCount = hydrant.getCount();
}
this.hydrants.addAll(hydrants);
@ -167,13 +169,13 @@ public class Sink implements Iterable<FireHydrant>
Lists.<String>newArrayList(),
Lists.transform(
Arrays.asList(schema.getAggregators()), new Function<AggregatorFactory, String>()
{
@Override
public String apply(@Nullable AggregatorFactory input)
{
return input.getName();
}
}
{
@Override
public String apply(@Nullable AggregatorFactory input)
{
return input.getName();
}
}
),
config.getShardSpec(),
null,
@ -208,7 +210,13 @@ public class Sink implements Iterable<FireHydrant>
final FireHydrant old;
synchronized (hydrantLock) {
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
int newCount = 0;
int numHydrants = hydrants.size();
if (numHydrants > 0) {
FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
newCount = lastHydrant.getCount() + 1;
}
currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
hydrants.add(currHydrant);
}

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
@ -29,8 +30,10 @@ import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
@ -49,6 +52,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
@ -69,9 +73,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
@ -88,6 +94,7 @@ public class RealtimePlumberSchoolTest
private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig;
private DataSchema schema;
private DataSchema schema2;
private FireDepartmentMetrics metrics;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
@ -134,6 +141,22 @@ public class RealtimePlumberSchoolTest
jsonMapper
);
schema2 = new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
)
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.YEAR, QueryGranularity.NONE, null),
jsonMapper
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
announcer.announceSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
@ -212,7 +235,7 @@ public class RealtimePlumberSchoolTest
private void testPersist(final Object commitMetadata) throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
final AtomicBoolean committed = new AtomicBoolean(false);
plumber.getSinks()
.put(
0L,
@ -240,13 +263,13 @@ public class RealtimePlumberSchoolTest
@Override
public void run()
{
committed.setValue(true);
committed.set(true);
}
};
plumber.add(row, Suppliers.ofInstance(committer));
plumber.persist(committer);
while (!committed.booleanValue()) {
while (!committed.get()) {
Thread.sleep(100);
}
plumber.getSinks().clear();
@ -256,7 +279,7 @@ public class RealtimePlumberSchoolTest
@Test(timeout = 60000)
public void testPersistFails() throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
final AtomicBoolean committed = new AtomicBoolean(false);
plumber.getSinks()
.put(
0L,
@ -280,13 +303,13 @@ public class RealtimePlumberSchoolTest
@Override
public void run()
{
committed.setValue(true);
committed.set(true);
throw new RuntimeException();
}
}
).get()
);
while (!committed.booleanValue()) {
while (!committed.get()) {
Thread.sleep(100);
}
@ -297,4 +320,149 @@ public class RealtimePlumberSchoolTest
Assert.assertEquals(1, metrics.failedPersists());
}
@Test(timeout = 60000)
public void testPersistHydrantGaps() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testPersistHydrantGapsHelper(commitMetadata);
}
private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception
{
final AtomicBoolean committed = new AtomicBoolean(false);
Interval testInterval = new Interval(new DateTime("1970-01-01"), new DateTime("1971-01-01"));
RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
plumber2.getSinks()
.put(
0L,
new Sink(
testInterval,
schema2,
tuningConfig,
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
Assert.assertNull(plumber2.startJob());
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
committed.set(true);
}
};
plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer));
plumber2.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer));
plumber2.persist(committer);
while (!committed.get()) {
Thread.sleep(100);
}
plumber2.getSinks().clear();
plumber2.finishJob();
File persistDir = plumber2.computePersistDir(schema2, testInterval);
/* Check that all hydrants were persisted */
for (int i = 0; i < 5; i ++) {
Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists());
}
/* Create some gaps in the persisted hydrants and reload */
FileUtils.deleteDirectory(new File(persistDir, "1"));
FileUtils.deleteDirectory(new File(persistDir, "3"));
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
restoredPlumber.bootstrapSinksFromDisk();
Map<Long, Sink> sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(new Long(0)));
DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z");
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")),
hydrants.get(0).getSegment().getDataInterval());
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")),
hydrants.get(1).getSegment().getDataInterval());
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")),
hydrants.get(2).getSegment().getDataInterval());
/* Delete all the hydrants and reload, no sink should be created */
FileUtils.deleteDirectory(new File(persistDir, "0"));
FileUtils.deleteDirectory(new File(persistDir, "2"));
FileUtils.deleteDirectory(new File(persistDir, "4"));
RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
restoredPlumber2.bootstrapSinksFromDisk();
Assert.assertEquals(0, restoredPlumber2.getSinks().size());
}
private InputRow getTestInputRow(final String timeStr) {
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime(timeStr).getMillis();
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timeStr);
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}
}