Fix race in ingestion that can cause FNF,
Properly check for partitionNum
shutdown persistExecutor
This commit is contained in:
nishantmonu51 2014-05-01 01:34:58 +05:30
parent ffd37bc7bd
commit ff56573910
1 changed files with 7 additions and 4 deletions

View File

@ -449,10 +449,11 @@ public class RealtimePlumber implements Plumber
protected void shutdownExecutors() protected void shutdownExecutors()
{ {
// scheduledExecutor is shutdown here, but persistExecutor is shutdown when the // scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the
// ServerView sends it a new segment callback // ServerView sends it a new segment callback
if (scheduledExecutor != null) { if (scheduledExecutor != null) {
scheduledExecutor.shutdown(); scheduledExecutor.shutdown();
persistExecutor.shutdown();
} }
} }
@ -715,7 +716,7 @@ public class RealtimePlumber implements Plumber
private void registerServerViewCallback() private void registerServerViewCallback()
{ {
serverView.registerSegmentCallback( serverView.registerSegmentCallback(
persistExecutor, mergeExecutor,
new ServerView.BaseSegmentCallback() new ServerView.BaseSegmentCallback()
{ {
@Override @Override
@ -723,7 +724,7 @@ public class RealtimePlumber implements Plumber
{ {
if (stopped) { if (stopped) {
log.info("Unregistering ServerViewCallback"); log.info("Unregistering ServerViewCallback");
persistExecutor.shutdown(); mergeExecutor.shutdown();
return ServerView.CallbackAction.UNREGISTER; return ServerView.CallbackAction.UNREGISTER;
} }
@ -732,7 +733,9 @@ public class RealtimePlumber implements Plumber
} }
log.debug("Checking segment[%s] on server[%s]", segment, server); log.debug("Checking segment[%s] on server[%s]", segment, server);
if (schema.getDataSource().equals(segment.getDataSource())) { if (schema.getDataSource().equals(segment.getDataSource())
&& schema.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
) {
final Interval interval = segment.getInterval(); final Interval interval = segment.getInterval();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) { for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long sinkKey = entry.getKey(); final Long sinkKey = entry.getKey();