diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index acfb44184ad..a5c67b595f7 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -44,6 +44,9 @@ public class SegmentLoaderConfig @JsonProperty("numLoadingThreads") private int numLoadingThreads = 1; + @JsonProperty("numBootstrapThreads") + private Integer numBootstrapThreads = null; + @JsonProperty private File infoDir = null; @@ -72,6 +75,10 @@ public class SegmentLoaderConfig return numLoadingThreads; } + public int getNumBootstrapThreads() { + return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java index 5405bbb5da4..c4c3f9e1788 100644 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java @@ -53,7 +53,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler private volatile PathChildrenCache loadQueueCache; private volatile boolean started = false; - private final ListeningExecutorService loadingExec; public BaseZkCoordinator( ObjectMapper jsonMapper, @@ -68,12 +67,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler this.config = config; this.me = me; this.curator = curator; - this.loadingExec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - config.getNumLoadingThreads(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() - ) - ); } @LifecycleStart @@ -95,7 +88,10 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler loadQueueLocation, true, true, - loadingExec + Executors.newFixedThreadPool( + config.getNumLoadingThreads(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() + ) ); try { @@ -217,9 +213,4 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler public abstract void loadLocalCache(); public abstract DataSegmentChangeHandler getDataSegmentChangeHandler(); - - public ListeningExecutorService getLoadingExecutor() - { - return loadingExec; - } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 3804db09041..0a9523aaba5 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -20,12 +20,13 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; import io.druid.server.initialization.ZkPathsConfig; @@ -34,13 +35,18 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -86,8 +92,10 @@ public class ZkCoordinator extends BaseZkCoordinator } List cachedSegments = Lists.newArrayList(); - for (File file : baseDir.listFiles()) { - log.info("Loading segment cache file [%s]", file); + File[] segmentsToLoad = baseDir.listFiles(); + for (int i = 0; i < segmentsToLoad.length; i++) { + File file = segmentsToLoad[i]; + log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); try { DataSegment segment = jsonMapper.readValue(file, DataSegment.class); if (serverManager.isSegmentCached(segment)) { @@ -179,69 +187,71 @@ public class ZkCoordinator extends BaseZkCoordinator } } - public void addSegments(Iterable segments, final DataSegmentChangeCallback callback) + private void addSegments(Collection segments, final DataSegmentChangeCallback callback) { - try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = - new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + ExecutorService loadingExecutor = null; + try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = + new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + backgroundSegmentAnnouncer.startAnnouncing(); - final List segmentLoading = Lists.newArrayList(); + loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s"); + final int numSegments = segments.size(); + final CountDownLatch latch = new CountDownLatch(numSegments); + final AtomicInteger counter = new AtomicInteger(0); + final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); for (final DataSegment segment : segments) { - segmentLoading.add( - getLoadingExecutor().submit( - new Callable() - { - @Override - public Void call() throws SegmentLoadingException - { + loadingExecutor.submit( + new Runnable() { + @Override + public void run() { + try { + log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); + final boolean loaded = loadSegment(segment, callback); + if (loaded) { try { - log.info("Loading segment %s", segment.getIdentifier()); - final boolean loaded = loadSegment(segment, callback); - if (loaded) { - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - } - return null; - } catch(SegmentLoadingException e) { - log.error(e, "[%s] failed to load", segment.getIdentifier()); - throw e; + backgroundSegmentAnnouncer.announceSegment(segment); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); } } + } catch (SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getIdentifier()); + failedSegments.add(segment); + } finally { + latch.countDown(); } - ) + } + } ); } - int failed = 0; - for(ListenableFuture future : segmentLoading) { - try { - future.get(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } catch(ExecutionException e) { - failed++; + try{ + latch.await(); + + if(failedSegments.size() > 0) { + log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) + .addData("failedSegments", failedSegments); } - } - if(failed > 0) { - throw new SegmentLoadingException("%,d errors seen while loading segments", failed); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + log.makeAlert(e, "LoadingInterrupted"); } backgroundSegmentAnnouncer.finishAnnouncing(); } catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segments") - .addData("segments", segments) - .emit(); + log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") + .addData("numSegments", segments.size()) + .emit(); } finally { callback.execute(); + if (loadingExecutor != null) { + loadingExecutor.shutdownNow(); + } } }