diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java index 402f97d2a5c..335aed6bda2 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java @@ -32,6 +32,8 @@ public class FireDepartmentMetrics private final AtomicLong numPersists = new AtomicLong(0); private final AtomicLong persistTimeMillis = new AtomicLong(0); private final AtomicLong persistBackPressureMillis = new AtomicLong(0); + private final AtomicLong failedPersists = new AtomicLong(0); + private final AtomicLong failedHandoffs = new AtomicLong(0); public void incrementProcessed() { @@ -68,6 +70,16 @@ public class FireDepartmentMetrics persistBackPressureMillis.addAndGet(millis); } + public void incrementFailedPersists() + { + failedPersists.incrementAndGet(); + } + + public void incrementFailedHandoffs() + { + failedHandoffs.incrementAndGet(); + } + public long processed() { return processedCount.get(); @@ -103,6 +115,16 @@ public class FireDepartmentMetrics return persistBackPressureMillis.get(); } + public long failedPersists() + { + return failedPersists.get(); + } + + public long failedHandoffs() + { + return failedHandoffs.get(); + } + public FireDepartmentMetrics snapshot() { final FireDepartmentMetrics retVal = new FireDepartmentMetrics(); @@ -113,6 +135,8 @@ public class FireDepartmentMetrics retVal.numPersists.set(numPersists.get()); retVal.persistTimeMillis.set(persistTimeMillis.get()); retVal.persistBackPressureMillis.set(persistBackPressureMillis.get()); + retVal.failedPersists.set(failedPersists.get()); + retVal.failedHandoffs.set(failedHandoffs.get()); return retVal; } @@ -132,6 +156,8 @@ public class FireDepartmentMetrics numPersists.addAndGet(otherSnapshot.numPersists()); persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis()); persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis()); + failedPersists.addAndGet(otherSnapshot.failedPersists()); + failedHandoffs.addAndGet(otherSnapshot.failedHandoffs()); return this; } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 2b4afca34d0..9f8bb49b8ba 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -77,6 +77,8 @@ public class RealtimeMetricsMonitor extends AbstractMonitor metrics.persistBackPressureMillis() - previous.persistBackPressureMillis() ) ); + emitter.emit(builder.build("failed/persists", metrics.failedPersists() - previous.failedPersists())); + emitter.emit(builder.build("failed/handoff", metrics.failedHandoffs() - previous.failedHandoffs())); previousValues.put(fireDepartment, metrics); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 1f6eb5e31fe..3701948ec0f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -117,6 +117,7 @@ public class RealtimePlumber implements Plumber private volatile boolean shuttingDown = false; private volatile boolean stopped = false; + private volatile boolean cleanShutdown = true; private volatile ExecutorService persistExecutor = null; private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; @@ -355,6 +356,10 @@ public class RealtimePlumber implements Plumber } commitRunnable.run(); } + catch (Exception e) { + metrics.incrementFailedPersists(); + throw e; + } finally { metrics.incrementNumPersists(); metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); @@ -460,12 +465,14 @@ public class RealtimePlumber implements Plumber } } catch (Exception e) { + metrics.incrementFailedHandoffs(); log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) .addData("interval", interval) .emit(); if (shuttingDown) { // We're trying to shut down, and this segment failed to push. Let's just get rid of it. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. + cleanShutdown = false; abandonSegment(truncatedTime, sink); } } @@ -518,6 +525,10 @@ public class RealtimePlumber implements Plumber shutdownExecutors(); stopped = true; + + if (!cleanShutdown) { + throw new ISE("Exception occurred during persist and merge."); + } } protected void initializeExecutors() diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 2ee60ae190d..b0e489f80d0 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -45,6 +45,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import junit.framework.Assert; import org.apache.commons.lang.mutable.MutableBoolean; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -77,6 +78,7 @@ public class RealtimePlumberSchoolTest private ServiceEmitter emitter; private RealtimeTuningConfig tuningConfig; private DataSchema schema; + private FireDepartmentMetrics metrics; public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) { @@ -101,11 +103,10 @@ public class RealtimePlumberSchoolTest @Before public void setUp() throws Exception { - final File tmpDir = Files.createTempDir(); tmpDir.deleteOnExit(); - schema = new DataSchema( + schema = new DataSchema( "test", new InputRowParser() { @@ -176,7 +177,8 @@ public class RealtimePlumberSchoolTest MoreExecutors.sameThreadExecutor() ); - plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics()); + metrics = new FireDepartmentMetrics(); + plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics); } @After @@ -189,7 +191,16 @@ public class RealtimePlumberSchoolTest public void testPersist() throws Exception { final MutableBoolean committed = new MutableBoolean(false); - plumber.getSinks().put(0L, new Sink(new Interval(0, TimeUnit.HOURS.toMillis(1)),schema, tuningConfig, new DateTime("2014-12-01T12:34:56.789").toString())); + plumber.getSinks() + .put( + 0L, + new Sink( + new Interval(0, TimeUnit.HOURS.toMillis(1)), + schema, + tuningConfig, + new DateTime("2014-12-01T12:34:56.789").toString() + ) + ); plumber.startJob(); final InputRow row = EasyMock.createNiceMock(InputRow.class); EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); @@ -213,4 +224,43 @@ public class RealtimePlumberSchoolTest plumber.getSinks().clear(); plumber.finishJob(); } + + @Test(timeout = 60000) + public void testPersistFails() throws Exception + { + final MutableBoolean committed = new MutableBoolean(false); + plumber.getSinks() + .put( + 0L, + new Sink( + new Interval(0, TimeUnit.HOURS.toMillis(1)), + schema, + tuningConfig, + new DateTime("2014-12-01T12:34:56.789").toString() + ) + ); + plumber.startJob(); + final InputRow row = EasyMock.createNiceMock(InputRow.class); + EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); + EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); + EasyMock.replay(row); + plumber.add(row); + plumber.persist( + new Runnable() + { + @Override + public void run() + { + committed.setValue(true); + throw new RuntimeException(); + } + } + ); + + while (!committed.booleanValue()) { + Thread.sleep(100); + } + + Assert.assertEquals(1, metrics.failedPersists()); + } }