Merge pull request #1171 from druid-io/fix-rt-task

Add more metrics and fail when realtime tasks fail to persist and merge
This commit is contained in:
Xavier Léauté 2015-03-06 16:42:23 -08:00
commit e161021902
4 changed files with 93 additions and 4 deletions

View File

@ -32,6 +32,8 @@ public class FireDepartmentMetrics
private final AtomicLong numPersists = new AtomicLong(0); private final AtomicLong numPersists = new AtomicLong(0);
private final AtomicLong persistTimeMillis = new AtomicLong(0); private final AtomicLong persistTimeMillis = new AtomicLong(0);
private final AtomicLong persistBackPressureMillis = new AtomicLong(0); private final AtomicLong persistBackPressureMillis = new AtomicLong(0);
private final AtomicLong failedPersists = new AtomicLong(0);
private final AtomicLong failedHandoffs = new AtomicLong(0);
public void incrementProcessed() public void incrementProcessed()
{ {
@ -68,6 +70,16 @@ public class FireDepartmentMetrics
persistBackPressureMillis.addAndGet(millis); persistBackPressureMillis.addAndGet(millis);
} }
public void incrementFailedPersists()
{
failedPersists.incrementAndGet();
}
public void incrementFailedHandoffs()
{
failedHandoffs.incrementAndGet();
}
public long processed() public long processed()
{ {
return processedCount.get(); return processedCount.get();
@ -103,6 +115,16 @@ public class FireDepartmentMetrics
return persistBackPressureMillis.get(); return persistBackPressureMillis.get();
} }
public long failedPersists()
{
return failedPersists.get();
}
public long failedHandoffs()
{
return failedHandoffs.get();
}
public FireDepartmentMetrics snapshot() public FireDepartmentMetrics snapshot()
{ {
final FireDepartmentMetrics retVal = new FireDepartmentMetrics(); final FireDepartmentMetrics retVal = new FireDepartmentMetrics();
@ -113,6 +135,8 @@ public class FireDepartmentMetrics
retVal.numPersists.set(numPersists.get()); retVal.numPersists.set(numPersists.get());
retVal.persistTimeMillis.set(persistTimeMillis.get()); retVal.persistTimeMillis.set(persistTimeMillis.get());
retVal.persistBackPressureMillis.set(persistBackPressureMillis.get()); retVal.persistBackPressureMillis.set(persistBackPressureMillis.get());
retVal.failedPersists.set(failedPersists.get());
retVal.failedHandoffs.set(failedHandoffs.get());
return retVal; return retVal;
} }
@ -132,6 +156,8 @@ public class FireDepartmentMetrics
numPersists.addAndGet(otherSnapshot.numPersists()); numPersists.addAndGet(otherSnapshot.numPersists());
persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis()); persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis());
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis()); persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
failedPersists.addAndGet(otherSnapshot.failedPersists());
failedHandoffs.addAndGet(otherSnapshot.failedHandoffs());
return this; return this;
} }
} }

View File

@ -77,6 +77,8 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
metrics.persistBackPressureMillis() - previous.persistBackPressureMillis() metrics.persistBackPressureMillis() - previous.persistBackPressureMillis()
) )
); );
emitter.emit(builder.build("failed/persists", metrics.failedPersists() - previous.failedPersists()));
emitter.emit(builder.build("failed/handoff", metrics.failedHandoffs() - previous.failedHandoffs()));
previousValues.put(fireDepartment, metrics); previousValues.put(fireDepartment, metrics);
} }

View File

@ -117,6 +117,7 @@ public class RealtimePlumber implements Plumber
private volatile boolean shuttingDown = false; private volatile boolean shuttingDown = false;
private volatile boolean stopped = false; private volatile boolean stopped = false;
private volatile boolean cleanShutdown = true;
private volatile ExecutorService persistExecutor = null; private volatile ExecutorService persistExecutor = null;
private volatile ExecutorService mergeExecutor = null; private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null;
@ -355,6 +356,10 @@ public class RealtimePlumber implements Plumber
} }
commitRunnable.run(); commitRunnable.run();
} }
catch (Exception e) {
metrics.incrementFailedPersists();
throw e;
}
finally { finally {
metrics.incrementNumPersists(); metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
@ -460,12 +465,14 @@ public class RealtimePlumber implements Plumber
} }
} }
catch (Exception e) { catch (Exception e) {
metrics.incrementFailedHandoffs();
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval) .addData("interval", interval)
.emit(); .emit();
if (shuttingDown) { if (shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it. // We're trying to shut down, and this segment failed to push. Let's just get rid of it.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
cleanShutdown = false;
abandonSegment(truncatedTime, sink); abandonSegment(truncatedTime, sink);
} }
} }
@ -518,6 +525,10 @@ public class RealtimePlumber implements Plumber
shutdownExecutors(); shutdownExecutors();
stopped = true; stopped = true;
if (!cleanShutdown) {
throw new ISE("Exception occurred during persist and merge.");
}
} }
protected void initializeExecutors() protected void initializeExecutors()

View File

@ -45,6 +45,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -77,6 +78,7 @@ public class RealtimePlumberSchoolTest
private ServiceEmitter emitter; private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig; private RealtimeTuningConfig tuningConfig;
private DataSchema schema; private DataSchema schema;
private FireDepartmentMetrics metrics;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy)
{ {
@ -101,7 +103,6 @@ public class RealtimePlumberSchoolTest
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
final File tmpDir = Files.createTempDir(); final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit(); tmpDir.deleteOnExit();
@ -176,7 +177,8 @@ public class RealtimePlumberSchoolTest
MoreExecutors.sameThreadExecutor() MoreExecutors.sameThreadExecutor()
); );
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics()); metrics = new FireDepartmentMetrics();
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics);
} }
@After @After
@ -189,7 +191,16 @@ public class RealtimePlumberSchoolTest
public void testPersist() throws Exception public void testPersist() throws Exception
{ {
final MutableBoolean committed = new MutableBoolean(false); final MutableBoolean committed = new MutableBoolean(false);
plumber.getSinks().put(0L, new Sink(new Interval(0, TimeUnit.HOURS.toMillis(1)),schema, tuningConfig, new DateTime("2014-12-01T12:34:56.789").toString())); plumber.getSinks()
.put(
0L,
new Sink(
new Interval(0, TimeUnit.HOURS.toMillis(1)),
schema,
tuningConfig,
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
plumber.startJob(); plumber.startJob();
final InputRow row = EasyMock.createNiceMock(InputRow.class); final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
@ -213,4 +224,43 @@ public class RealtimePlumberSchoolTest
plumber.getSinks().clear(); plumber.getSinks().clear();
plumber.finishJob(); plumber.finishJob();
} }
@Test(timeout = 60000)
public void testPersistFails() throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
plumber.getSinks()
.put(
0L,
new Sink(
new Interval(0, TimeUnit.HOURS.toMillis(1)),
schema,
tuningConfig,
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
plumber.startJob();
final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row);
plumber.add(row);
plumber.persist(
new Runnable()
{
@Override
public void run()
{
committed.setValue(true);
throw new RuntimeException();
}
}
);
while (!committed.booleanValue()) {
Thread.sleep(100);
}
Assert.assertEquals(1, metrics.failedPersists());
}
} }