diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 70affc17854..8024edf3daa 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -187,15 +187,15 @@ public class ZkCoordinator extends BaseZkCoordinator new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { backgroundSegmentAnnouncer.startAnnouncing(); - final List> segmentLoading = Lists.newArrayList(); + final List segmentLoading = Lists.newArrayList(); for (final DataSegment segment : segments) { segmentLoading.add( getLoadingExecutor().submit( - new Callable() + new Callable() { @Override - public Boolean call() throws SegmentLoadingException + public Void call() throws SegmentLoadingException { try { log.info("Loading segment %s", segment.getIdentifier()); @@ -209,7 +209,7 @@ public class ZkCoordinator extends BaseZkCoordinator throw new SegmentLoadingException(e, "Loading Interrupted"); } } - return loaded; + return null; } catch(SegmentLoadingException e) { log.error(e, "[%s] failed to load", segment.getIdentifier()); throw e; @@ -299,6 +299,8 @@ public class ZkCoordinator extends BaseZkCoordinator private final LinkedBlockingQueue queue; private final SettableFuture doneAnnouncing; + private final Object lock = new Object(); + private volatile boolean finished = false; private volatile ScheduledFuture startedAnnouncing = null; private volatile ScheduledFuture nextAnnoucement = null; @@ -339,25 +341,27 @@ public class ZkCoordinator extends BaseZkCoordinator @Override public void run() { - try { - if (!(finished && queue.isEmpty())) { - List segments = Lists.newLinkedList(); - queue.drainTo(segments); - try { - announcer.announceSegments(segments); - nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + synchronized (lock) { + try { + if (!(finished && queue.isEmpty())) { + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + try { + announcer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + } + catch (IOException e) { + doneAnnouncing.setException( + new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) + ); + } + } else { + doneAnnouncing.set(true); } - catch (IOException e) { - doneAnnouncing.setException( - new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) - ); - } - } else { - doneAnnouncing.set(true); } - } - catch (Exception e) { - doneAnnouncing.setException(e); + catch (Exception e) { + doneAnnouncing.setException(e); + } } } }, @@ -368,13 +372,29 @@ public class ZkCoordinator extends BaseZkCoordinator public void finishAnnouncing() throws SegmentLoadingException { - finished = true; - - if (startedAnnouncing != null && (startedAnnouncing.isDone() || !startedAnnouncing.cancel(false))) { - log.info("Waiting for background segment announcing task to complete"); - // background announcing already started, wait for it to complete + synchronized (lock) { + finished = true; + // announce any remaining segments try { - doneAnnouncing.get(); + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + announcer.announceSegments(segments); + } + catch (Exception e) { + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); + } + + // get any exception that may have been thrown in background annoucing + try { + // check in case we did not call startAnnouncing + if (startedAnnouncing != null) { + startedAnnouncing.cancel(false); + } + // - if the task is waiting on the lock, then the queue will be empty by the time it runs + // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing + if (doneAnnouncing.isDone()) { + doneAnnouncing.get(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -383,14 +403,6 @@ public class ZkCoordinator extends BaseZkCoordinator catch (ExecutionException e) { throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); } - } else { - // background task has not started yet, announcing immediately - try { - announcer.announceSegments(queue); - } - catch (Exception e) { - throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); - } } log.info("Completed background segment announcing"); } @@ -398,9 +410,12 @@ public class ZkCoordinator extends BaseZkCoordinator @Override public void close() { - finished = true; - if (nextAnnoucement != null) { - nextAnnoucement.cancel(false); + // stop background scheduling + synchronized (lock) { + finished = true; + if (nextAnnoucement != null) { + nextAnnoucement.cancel(false); + } } } }