From 1aad7ce521ea21dfcaa6793b8c669266933f2761 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Tue, 16 Jun 2015 19:11:04 -0700 Subject: [PATCH 1/2] Add a bit more information to the mapping logging logic. --- .../druid/server/coordination/ZkCoordinator.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 3804db09041..29578ed68ba 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -34,6 +34,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -41,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -86,8 +88,10 @@ public class ZkCoordinator extends BaseZkCoordinator } List cachedSegments = Lists.newArrayList(); - for (File file : baseDir.listFiles()) { - log.info("Loading segment cache file [%s]", file); + File[] segmentsToLoad = baseDir.listFiles(); + for (int i = 0; i < segmentsToLoad.length; i++) { + File file = segmentsToLoad[i]; + log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); try { DataSegment segment = jsonMapper.readValue(file, DataSegment.class); if (serverManager.isSegmentCached(segment)) { @@ -179,7 +183,7 @@ public class ZkCoordinator extends BaseZkCoordinator } } - public void addSegments(Iterable segments, final DataSegmentChangeCallback callback) + public void addSegments(Collection segments, final DataSegmentChangeCallback callback) { try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { @@ -187,6 +191,8 @@ public class ZkCoordinator extends BaseZkCoordinator final List segmentLoading = Lists.newArrayList(); + final int numSegments = segments.size(); + final AtomicLong counter = new AtomicLong(0); for (final DataSegment segment : segments) { segmentLoading.add( getLoadingExecutor().submit( @@ -196,7 +202,7 @@ public class ZkCoordinator extends BaseZkCoordinator public Void call() throws SegmentLoadingException { try { - log.info("Loading segment %s", segment.getIdentifier()); + log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); final boolean loaded = loadSegment(segment, callback); if (loaded) { try { From 06c97b6d7db9eb74ebb52659630c0447e8332fee Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 17 Jun 2015 11:32:12 -0700 Subject: [PATCH 2/2] Separate out the "bootstrapping" threads from the "loading" threads. It is highly recommended to keep loading threads single threaded. There can be benefits to having multiple bootstrapping threads, though. Those bootstrapping threads shouldn't be kept alive for the whole life of the process, however. Also, adjust some logging and do some code cleanup. --- .../segment/loading/SegmentLoaderConfig.java | 7 ++ .../coordination/BaseZkCoordinator.java | 17 +--- .../server/coordination/ZkCoordinator.java | 96 ++++++++++--------- 3 files changed, 61 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index acfb44184ad..a5c67b595f7 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -44,6 +44,9 @@ public class SegmentLoaderConfig @JsonProperty("numLoadingThreads") private int numLoadingThreads = 1; + @JsonProperty("numBootstrapThreads") + private Integer numBootstrapThreads = null; + @JsonProperty private File infoDir = null; @@ -72,6 +75,10 @@ public class SegmentLoaderConfig return numLoadingThreads; } + public int getNumBootstrapThreads() { + return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java index 5405bbb5da4..c4c3f9e1788 100644 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java @@ -53,7 +53,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler private volatile PathChildrenCache loadQueueCache; private volatile boolean started = false; - private final ListeningExecutorService loadingExec; public BaseZkCoordinator( ObjectMapper jsonMapper, @@ -68,12 +67,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler this.config = config; this.me = me; this.curator = curator; - this.loadingExec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - config.getNumLoadingThreads(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() - ) - ); } @LifecycleStart @@ -95,7 +88,10 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler loadQueueLocation, true, true, - loadingExec + Executors.newFixedThreadPool( + config.getNumLoadingThreads(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() + ) ); try { @@ -217,9 +213,4 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler public abstract void loadLocalCache(); public abstract DataSegmentChangeHandler getDataSegmentChangeHandler(); - - public ListeningExecutorService getLoadingExecutor() - { - return loadingExec; - } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 29578ed68ba..0a9523aaba5 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -20,12 +20,13 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; import io.druid.server.initialization.ZkPathsConfig; @@ -36,13 +37,16 @@ import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -183,71 +187,71 @@ public class ZkCoordinator extends BaseZkCoordinator } } - public void addSegments(Collection segments, final DataSegmentChangeCallback callback) + private void addSegments(Collection segments, final DataSegmentChangeCallback callback) { - try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = - new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + ExecutorService loadingExecutor = null; + try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = + new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + backgroundSegmentAnnouncer.startAnnouncing(); - final List segmentLoading = Lists.newArrayList(); + loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s"); final int numSegments = segments.size(); - final AtomicLong counter = new AtomicLong(0); + final CountDownLatch latch = new CountDownLatch(numSegments); + final AtomicInteger counter = new AtomicInteger(0); + final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); for (final DataSegment segment : segments) { - segmentLoading.add( - getLoadingExecutor().submit( - new Callable() - { - @Override - public Void call() throws SegmentLoadingException - { + loadingExecutor.submit( + new Runnable() { + @Override + public void run() { + try { + log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); + final boolean loaded = loadSegment(segment, callback); + if (loaded) { try { - log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); - final boolean loaded = loadSegment(segment, callback); - if (loaded) { - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - } - return null; - } catch(SegmentLoadingException e) { - log.error(e, "[%s] failed to load", segment.getIdentifier()); - throw e; + backgroundSegmentAnnouncer.announceSegment(segment); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); } } + } catch (SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getIdentifier()); + failedSegments.add(segment); + } finally { + latch.countDown(); } - ) + } + } ); } - int failed = 0; - for(ListenableFuture future : segmentLoading) { - try { - future.get(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } catch(ExecutionException e) { - failed++; + try{ + latch.await(); + + if(failedSegments.size() > 0) { + log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) + .addData("failedSegments", failedSegments); } - } - if(failed > 0) { - throw new SegmentLoadingException("%,d errors seen while loading segments", failed); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + log.makeAlert(e, "LoadingInterrupted"); } backgroundSegmentAnnouncer.finishAnnouncing(); } catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segments") - .addData("segments", segments) - .emit(); + log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") + .addData("numSegments", segments.size()) + .emit(); } finally { callback.execute(); + if (loadingExecutor != null) { + loadingExecutor.shutdownNow(); + } } }