From ff56573910b26b065bcaf2823692a6f43a1861ec Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 01:34:58 +0530 Subject: [PATCH] Fix race Fix race in ingestion that can cause FNF, Properly check for partitionNum shutdown persistExecutor --- .../segment/realtime/plumber/RealtimePlumber.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 0210219a3d3..b5a93e68942 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -449,10 +449,11 @@ public class RealtimePlumber implements Plumber protected void shutdownExecutors() { - // scheduledExecutor is shutdown here, but persistExecutor is shutdown when the + // scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the // ServerView sends it a new segment callback if (scheduledExecutor != null) { scheduledExecutor.shutdown(); + persistExecutor.shutdown(); } } @@ -715,7 +716,7 @@ public class RealtimePlumber implements Plumber private void registerServerViewCallback() { serverView.registerSegmentCallback( - persistExecutor, + mergeExecutor, new ServerView.BaseSegmentCallback() { @Override @@ -723,7 +724,7 @@ public class RealtimePlumber implements Plumber { if (stopped) { log.info("Unregistering ServerViewCallback"); - persistExecutor.shutdown(); + mergeExecutor.shutdown(); return ServerView.CallbackAction.UNREGISTER; } @@ -732,7 +733,9 @@ public class RealtimePlumber implements Plumber } log.debug("Checking segment[%s] on server[%s]", segment, server); - if (schema.getDataSource().equals(segment.getDataSource())) { + if (schema.getDataSource().equals(segment.getDataSource()) + && schema.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() + ) { final Interval interval = segment.getInterval(); for (Map.Entry entry : sinks.entrySet()) { final Long sinkKey = entry.getKey();