From 32b6135f3d7379910f7452417afbddb2558f4ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 18 Sep 2014 12:01:28 -0700 Subject: [PATCH] background announce segment cache --- .../segment/loading/SegmentLoaderConfig.java | 8 + .../server/coordination/ZkCoordinator.java | 144 ++++++++++++++++-- 2 files changed, 140 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index 2cba40194a0..bbed8d27f47 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -40,6 +40,9 @@ public class SegmentLoaderConfig @JsonProperty("dropSegmentDelayMillis") private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds + @JsonProperty("announceIntervalMillis") + private int announceIntervalMillis = 5 * 1000; // 5 seconds + @JsonProperty private File infoDir = null; @@ -58,6 +61,11 @@ public class SegmentLoaderConfig return dropSegmentDelayMillis; } + public int getAnnounceIntervalMillis() + { + return announceIntervalMillis; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index b0611077aa3..ec2510df1fb 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -21,7 +21,10 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.emitter.EmittingLogger; import io.druid.segment.loading.SegmentLoaderConfig; @@ -33,7 +36,10 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -170,10 +176,11 @@ public class ZkCoordinator extends BaseZkCoordinator public void addSegments(Iterable segments, DataSegmentChangeCallback callback) { - try { - final List segmentFailures = Lists.newArrayList(); - final List validSegments = Lists.newArrayList(); + try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = + new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + backgroundSegmentAnnouncer.startAnnouncing(); + final List segmentFailures = Lists.newArrayList(); for (DataSegment segment : segments) { log.info("Loading segment %s", segment.getIdentifier()); @@ -202,26 +209,25 @@ public class ZkCoordinator extends BaseZkCoordinator } } - validSegments.add(segment); + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } catch(InterruptedException e) { + throw new SegmentLoadingException(e, "Loading Interrupted"); + } } } - try { - announcer.announceSegments(validSegments); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); - } - if (!segmentFailures.isEmpty()) { for (String segmentFailure : segmentFailures) { log.error("%s failed to load", segmentFailure); } throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); } + + backgroundSegmentAnnouncer.finishAnnouncing(); } catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segments for dataSource") + log.makeAlert(e, "Failed to load segments") .addData("segments", segments) .emit(); } @@ -272,4 +278,118 @@ public class ZkCoordinator extends BaseZkCoordinator callback.execute(); } } + + private static class BackgroundSegmentAnnouncer implements AutoCloseable { + private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); + + private final int intervalMillis; + private final DataSegmentAnnouncer announcer; + private final ScheduledExecutorService exec; + private final LinkedBlockingQueue queue; + private final SettableFuture doneAnnouncing; + + private volatile boolean finished = false; + private volatile ScheduledFuture startedAnnouncing = null; + private volatile ScheduledFuture nextAnnoucement = null; + + private BackgroundSegmentAnnouncer( + DataSegmentAnnouncer announcer, + ScheduledExecutorService exec, + int intervalMillis + ) + { + this.announcer = announcer; + this.exec = exec; + this.intervalMillis = intervalMillis; + this.queue = Queues.newLinkedBlockingQueue(); + this.doneAnnouncing = SettableFuture.create(); + } + + public void announceSegment(final DataSegment segment) throws InterruptedException + { + if (finished) { + throw new ISE("Announce segment called after finishAnnouncing"); + } + queue.put(segment); + } + + public void startAnnouncing() + { + if (intervalMillis <= 0) { + return; + } + + log.info("Starting background segment announcing task"); + + // schedule background announcing task + nextAnnoucement = startedAnnouncing = exec.schedule( + new Runnable() + { + @Override + public void run() + { + try { + if (!(finished && queue.isEmpty())) { + List segments = Lists.newLinkedList(); + queue.drainTo(segments); + try { + announcer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + } + catch (IOException e) { + doneAnnouncing.setException( + new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) + ); + } + } else { + doneAnnouncing.set(true); + } + } + catch (Exception e) { + doneAnnouncing.setException(e); + } + } + }, + intervalMillis, + TimeUnit.MILLISECONDS + ); + } + + public void finishAnnouncing() throws SegmentLoadingException + { + finished = true; + + if (startedAnnouncing != null && (startedAnnouncing.isDone() || !startedAnnouncing.cancel(false))) { + log.info("Waiting for background segment announcing task to complete"); + // background announcing already started, wait for it to complete + try { + doneAnnouncing.get(); + } + catch (InterruptedException e) { + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + catch (ExecutionException e) { + throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); + } + } else { + // background task has not started yet, announcing immediately + try { + announcer.announceSegments(queue); + } + catch (Exception e) { + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); + } + } + log.info("Completed background segment announcing"); + } + + @Override + public void close() + { + finished = true; + if (nextAnnoucement != null) { + nextAnnoucement.cancel(false); + } + } + } }