diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index bbed8d27f47..9abe5fc4497 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -43,6 +43,9 @@ public class SegmentLoaderConfig @JsonProperty("announceIntervalMillis") private int announceIntervalMillis = 5 * 1000; // 5 seconds + @JsonProperty("numLoadingThreads") + private int numLoadingThreads = 1; + @JsonProperty private File infoDir = null; @@ -66,6 +69,11 @@ public class SegmentLoaderConfig return announceIntervalMillis; } + public int getNumLoadingThreads() + { + return numLoadingThreads; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java index 9cbafaaa4dd..633ce904059 100644 --- a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java +++ b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import io.druid.client.ServerView; import io.druid.concurrent.Execs; import io.druid.db.DatabaseSegmentManager; +import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.server.coordination.BaseZkCoordinator; import io.druid.server.coordination.DataSegmentChangeCallback; @@ -53,6 +54,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator public BridgeZkCoordinator( ObjectMapper jsonMapper, ZkPathsConfig zkPaths, + SegmentLoaderConfig config, DruidServerMetadata me, @Bridge CuratorFramework curator, DbSegmentPublisher dbSegmentPublisher, @@ -60,7 +62,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator ServerView serverView ) { - super(jsonMapper, zkPaths, me, curator); + super(jsonMapper, zkPaths, config, me, curator); this.dbSegmentPublisher = dbSegmentPublisher; this.databaseSegmentManager = databaseSegmentManager; diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java index e751041747b..f31cfb6311c 100644 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java @@ -21,10 +21,13 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; +import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; @@ -34,6 +37,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** */ @@ -45,23 +50,33 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPaths; + private final SegmentLoaderConfig config; private final DruidServerMetadata me; private final CuratorFramework curator; private volatile PathChildrenCache loadQueueCache; private volatile boolean started; + private final ListeningExecutorService loadingExec; public BaseZkCoordinator( ObjectMapper jsonMapper, ZkPathsConfig zkPaths, + SegmentLoaderConfig config, DruidServerMetadata me, CuratorFramework curator ) { this.jsonMapper = jsonMapper; this.zkPaths = zkPaths; + this.config = config; this.me = me; this.curator = curator; + this.loadingExec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool( + config.getNumLoadingThreads(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() + ) + ); } @LifecycleStart @@ -83,7 +98,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler loadQueueLocation, true, true, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() + loadingExec ); try { @@ -200,4 +215,9 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler public abstract void loadLocalCache(); public abstract DataSegmentChangeHandler getDataSegmentChangeHandler(); + + public ListeningExecutorService getLoadingExecutor() + { + return loadingExec; + } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index ec2510df1fb..70affc17854 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -22,6 +22,7 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; @@ -36,6 +37,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -66,7 +68,7 @@ public class ZkCoordinator extends BaseZkCoordinator ScheduledExecutorFactory factory ) { - super(jsonMapper, zkPaths, me, curator); + super(jsonMapper, zkPaths, config, me, curator); this.jsonMapper = jsonMapper; this.config = config; @@ -127,42 +129,47 @@ public class ZkCoordinator extends BaseZkCoordinator return ZkCoordinator.this; } + private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + { + final boolean loaded; + try { + loaded = serverManager.loadSegment(segment); + } + catch (Exception e) { + removeSegment(segment, callback); + throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); + } + + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment, callback); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + } + return loaded; + } + @Override public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) { try { log.info("Loading segment %s", segment.getIdentifier()); - - final boolean loaded; - try { - loaded = serverManager.loadSegment(segment); - } - catch (Exception e) { - removeSegment(segment, callback); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); - } - - if (loaded) { - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment, callback); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); - } - } - + if(loadSegment(segment, callback)) { try { announcer.announceSegment(segment); } catch (IOException e) { throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } - } + }; } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") @@ -174,54 +181,58 @@ public class ZkCoordinator extends BaseZkCoordinator } } - public void addSegments(Iterable segments, DataSegmentChangeCallback callback) + public void addSegments(Iterable segments, final DataSegmentChangeCallback callback) { try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { backgroundSegmentAnnouncer.startAnnouncing(); - final List segmentFailures = Lists.newArrayList(); - for (DataSegment segment : segments) { - log.info("Loading segment %s", segment.getIdentifier()); + final List> segmentLoading = Lists.newArrayList(); - final boolean loaded; - try { - loaded = serverManager.loadSegment(segment); - } - catch (Exception e) { - log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); - removeSegment(segment, callback); - segmentFailures.add(segment.getIdentifier()); - continue; - } - - if (loaded) { - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment, callback); - segmentFailures.add(segment.getIdentifier()); - continue; - } - } - - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } catch(InterruptedException e) { - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - } + for (final DataSegment segment : segments) { + segmentLoading.add( + getLoadingExecutor().submit( + new Callable() + { + @Override + public Boolean call() throws SegmentLoadingException + { + try { + log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded = loadSegment(segment, callback); + if (loaded) { + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + } + return loaded; + } catch(SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getIdentifier()); + throw e; + } + } + } + ) + ); } - if (!segmentFailures.isEmpty()) { - for (String segmentFailure : segmentFailures) { - log.error("%s failed to load", segmentFailure); + int failed = 0; + for(ListenableFuture future : segmentLoading) { + try { + future.get(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } catch(ExecutionException e) { + failed++; } - throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); + } + if(failed > 0) { + throw new SegmentLoadingException("%,d errors seen while loading segments", failed); } backgroundSegmentAnnouncer.finishAnnouncing(); @@ -292,7 +303,7 @@ public class ZkCoordinator extends BaseZkCoordinator private volatile ScheduledFuture startedAnnouncing = null; private volatile ScheduledFuture nextAnnoucement = null; - private BackgroundSegmentAnnouncer( + public BackgroundSegmentAnnouncer( DataSegmentAnnouncer announcer, ScheduledExecutorService exec, int intervalMillis @@ -366,6 +377,7 @@ public class ZkCoordinator extends BaseZkCoordinator doneAnnouncing.get(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new SegmentLoadingException(e, "Loading Interrupted"); } catch (ExecutionException e) { diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index 0136a99482e..670fd892c24 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -31,6 +31,7 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.db.DatabaseSegmentManager; import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.server.DruidNode; import io.druid.server.coordination.BatchDataSegmentAnnouncer; @@ -156,6 +157,7 @@ public class DruidClusterBridgeTest BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator( jsonMapper, zkPathsConfig, + new SegmentLoaderConfig(), metadata, remoteCf, dbSegmentPublisher,