Merge pull request #549 from metamx/fix-sink

fix race condition with merge and persist and sink adding
This commit is contained in:
Gian Merlino 2014-05-16 15:40:31 -07:00
commit e4db0728f2
10 changed files with 80 additions and 54 deletions

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
@ -112,6 +113,16 @@ public class YeOldePlumberSchool implements PlumberSchool
}
@Override
public int add(InputRow row)
{
Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {
return -1;
}
return sink.add(row);
}
public Sink getSink(long timestamp)
{
if (theSink.getInterval().contains(timestamp)) {

View File

@ -398,17 +398,15 @@ public class IndexTask extends AbstractFixedIntervalTask
final InputRow inputRow = firehose.nextRow();
if (shouldIndex(shardSpec, interval, inputRow)) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
throw new NullPointerException(
int numRows = plumber.add(inputRow);
if (numRows == -1) {
throw new ISE(
String.format(
"Was expecting non-null sink for timestamp[%s]",
new DateTime(inputRow.getTimestampFromEpoch())
)
);
}
int numRows = sink.add(inputRow);
metrics.incrementProcessed();
if (numRows >= myRowFlushBoundary) {

View File

@ -335,8 +335,8 @@ public class RealtimeIndexTask extends AbstractTask
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
int currCount = plumber.add(inputRow);
if (currCount == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
@ -348,11 +348,6 @@ public class RealtimeIndexTask extends AbstractTask
continue;
}
if (sink.isEmpty()) {
log.info("Task %s: New sink: %s", getId(), sink);
}
int currCount = sink.add(inputRow);
fireDepartment.getMetrics().incrementProcessed();
if (currCount >= tuningConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());

View File

@ -43,7 +43,6 @@ import io.druid.query.SegmentDescriptor;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -197,8 +196,8 @@ public class RealtimeManager implements QuerySegmentWalker
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
int currCount = plumber.add(inputRow);
if (currCount == -1) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
@ -209,8 +208,6 @@ public class RealtimeManager implements QuerySegmentWalker
continue;
}
int currCount = sink.add(inputRow);
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
@ -225,13 +222,11 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
catch (RuntimeException e) {
log.makeAlert(
e,
"RuntimeException aborted realtime processing[%s]",
fireDepartment.getDataSchema().getDataSource()
)
.emit();
).emit();
normalExit = false;
throw e;
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime.plumber;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -30,7 +31,12 @@ public interface Plumber
*/
public void startJob();
public Sink getSink(long timestamp);
/**
* @param row - the row to insert
* @return - positive numbers indicate how many summarized rows exist in the index for that timestamp,
* -1 means a row was thrown away because it was too late
*/
public int add(InputRow row);
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**

View File

@ -21,6 +21,7 @@ import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -72,6 +73,7 @@ public class RealtimePlumber implements Plumber
private final DataSchema schema;
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter;
@ -151,6 +153,16 @@ public class RealtimePlumber implements Plumber
}
@Override
public int add(InputRow row)
{
final Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {
return -1;
}
return sink.add(row);
}
public Sink getSink(long timestamp)
{
if (!rejectionPolicy.accept(timestamp)) {

View File

@ -50,7 +50,7 @@ public class Sink implements Iterable<FireHydrant>
{
private static final Logger log = new Logger(Sink.class);
private volatile FireHydrant currIndex;
private volatile FireHydrant currHydrant;
private final Interval interval;
private final DataSchema schema;
@ -107,31 +107,35 @@ public class Sink implements Iterable<FireHydrant>
return interval;
}
public FireHydrant getCurrIndex()
public FireHydrant getCurrHydrant()
{
return currIndex;
return currHydrant;
}
public int add(InputRow row)
{
if (currIndex == null) {
throw new IAE("No currIndex but given row[%s]", row);
if (currHydrant == null) {
throw new IAE("No currHydrant but given row[%s]", row);
}
synchronized (currIndex) {
return currIndex.getIndex().add(row);
synchronized (currHydrant) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return -1; // the hydrant was swapped without being replaced
}
return index.add(row);
}
}
public boolean isEmpty()
{
synchronized (currIndex) {
return hydrants.size() == 1 && currIndex.getIndex().isEmpty();
synchronized (currHydrant) {
return hydrants.size() == 1 && currHydrant.getIndex().isEmpty();
}
}
/**
* If currIndex is A, creates a new index B, sets currIndex to B and returns A.
* If currHydrant is A, creates a new index B, sets currHydrant to B and returns A.
*
* @return the current index after swapping in a new one
*/
@ -142,8 +146,8 @@ public class Sink implements Iterable<FireHydrant>
public boolean swappable()
{
synchronized (currIndex) {
return currIndex.getIndex() != null && currIndex.getIndex().size() != 0;
synchronized (currHydrant) {
return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0;
}
}
@ -189,15 +193,15 @@ public class Sink implements Iterable<FireHydrant>
);
FireHydrant old;
if (currIndex == null) { // Only happens on initialization, cannot synchronize on null
old = currIndex;
currIndex = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currIndex);
if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currHydrant);
} else {
synchronized (currIndex) {
old = currIndex;
currIndex = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currIndex);
synchronized (currHydrant) {
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currHydrant);
}
}

View File

@ -258,6 +258,21 @@ public class RealtimeManagerTest
}
@Override
public int add(InputRow row)
{
if (row == null) {
return -1;
}
Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {
return -1;
}
return sink.add(row);
}
public Sink getSink(long timestamp)
{
if (sink.getInterval().contains(timestamp)) {

View File

@ -161,16 +161,6 @@ public class RealtimePlumberSchoolTest
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
}
@Test
public void testGetSink() throws Exception
{
final DateTime theTime = new DateTime("2013-01-01");
Sink sink = plumber.getSink(theTime.getMillis());
Assert.assertEquals(new Interval(String.format("%s/PT1H", theTime.toString())), sink.getInterval());
Assert.assertEquals(theTime.toString(), sink.getVersion());
}
@Test
public void testPersist() throws Exception
{

View File

@ -101,7 +101,7 @@ public class SinkTest
}
);
FireHydrant currHydrant = sink.getCurrIndex();
FireHydrant currHydrant = sink.getCurrHydrant();
Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
@ -143,8 +143,8 @@ public class SinkTest
);
Assert.assertEquals(currHydrant, swapHydrant);
Assert.assertNotSame(currHydrant, sink.getCurrIndex());
Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrIndex().getIndex().getInterval());
Assert.assertNotSame(currHydrant, sink.getCurrHydrant());
Assert.assertEquals(new Interval("2013-01-01/PT1M"), sink.getCurrHydrant().getIndex().getInterval());
Assert.assertEquals(2, Iterators.size(sink.iterator()));
}