diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 0210219a3d3..b5a93e68942 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -449,10 +449,11 @@ public class RealtimePlumber implements Plumber protected void shutdownExecutors() { - // scheduledExecutor is shutdown here, but persistExecutor is shutdown when the + // scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the // ServerView sends it a new segment callback if (scheduledExecutor != null) { scheduledExecutor.shutdown(); + persistExecutor.shutdown(); } } @@ -715,7 +716,7 @@ public class RealtimePlumber implements Plumber private void registerServerViewCallback() { serverView.registerSegmentCallback( - persistExecutor, + mergeExecutor, new ServerView.BaseSegmentCallback() { @Override @@ -723,7 +724,7 @@ public class RealtimePlumber implements Plumber { if (stopped) { log.info("Unregistering ServerViewCallback"); - persistExecutor.shutdown(); + mergeExecutor.shutdown(); return ServerView.CallbackAction.UNREGISTER; } @@ -732,7 +733,9 @@ public class RealtimePlumber implements Plumber } log.debug("Checking segment[%s] on server[%s]", segment, server); - if (schema.getDataSource().equals(segment.getDataSource())) { + if (schema.getDataSource().equals(segment.getDataSource()) + && schema.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() + ) { final Interval interval = segment.getInterval(); for (Map.Entry entry : sinks.entrySet()) { final Long sinkKey = entry.getKey();