From 32b6135f3d7379910f7452417afbddb2558f4ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 18 Sep 2014 12:01:28 -0700 Subject: [PATCH 1/6] background announce segment cache --- .../segment/loading/SegmentLoaderConfig.java | 8 + .../server/coordination/ZkCoordinator.java | 144 ++++++++++++++++-- 2 files changed, 140 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index 2cba40194a0..bbed8d27f47 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -40,6 +40,9 @@ public class SegmentLoaderConfig @JsonProperty("dropSegmentDelayMillis") private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds + @JsonProperty("announceIntervalMillis") + private int announceIntervalMillis = 5 * 1000; // 5 seconds + @JsonProperty private File infoDir = null; @@ -58,6 +61,11 @@ public class SegmentLoaderConfig return dropSegmentDelayMillis; } + public int getAnnounceIntervalMillis() + { + return announceIntervalMillis; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index b0611077aa3..ec2510df1fb 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -21,7 +21,10 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; +import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.emitter.EmittingLogger; import io.druid.segment.loading.SegmentLoaderConfig; @@ -33,7 +36,10 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -170,10 +176,11 @@ public class ZkCoordinator extends BaseZkCoordinator public void addSegments(Iterable segments, DataSegmentChangeCallback callback) { - try { - final List segmentFailures = Lists.newArrayList(); - final List validSegments = Lists.newArrayList(); + try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = + new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + backgroundSegmentAnnouncer.startAnnouncing(); + final List segmentFailures = Lists.newArrayList(); for (DataSegment segment : segments) { log.info("Loading segment %s", segment.getIdentifier()); @@ -202,26 +209,25 @@ public class ZkCoordinator extends BaseZkCoordinator } } - validSegments.add(segment); + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } catch(InterruptedException e) { + throw new SegmentLoadingException(e, "Loading Interrupted"); + } } } - try { - announcer.announceSegments(validSegments); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); - } - if (!segmentFailures.isEmpty()) { for (String segmentFailure : segmentFailures) { log.error("%s failed to load", segmentFailure); } throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); } + + backgroundSegmentAnnouncer.finishAnnouncing(); } catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segments for dataSource") + log.makeAlert(e, "Failed to load segments") .addData("segments", segments) .emit(); } @@ -272,4 +278,118 @@ public class ZkCoordinator extends BaseZkCoordinator callback.execute(); } } + + private static class BackgroundSegmentAnnouncer implements AutoCloseable { + private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); + + private final int intervalMillis; + private final DataSegmentAnnouncer announcer; + private final ScheduledExecutorService exec; + private final LinkedBlockingQueue queue; + private final SettableFuture doneAnnouncing; + + private volatile boolean finished = false; + private volatile ScheduledFuture startedAnnouncing = null; + private volatile ScheduledFuture nextAnnoucement = null; + + private BackgroundSegmentAnnouncer( + DataSegmentAnnouncer announcer, + ScheduledExecutorService exec, + int intervalMillis + ) + { + this.announcer = announcer; + this.exec = exec; + this.intervalMillis = intervalMillis; + this.queue = Queues.newLinkedBlockingQueue(); + this.doneAnnouncing = SettableFuture.create(); + } + + public void announceSegment(final DataSegment segment) throws InterruptedException + { + if (finished) { + throw new ISE("Announce segment called after finishAnnouncing"); + } + queue.put(segment); + } + + public void startAnnouncing() + { + if (intervalMillis <= 0) { + return; + } + + log.info("Starting background segment announcing task"); + + // schedule background announcing task + nextAnnoucement = startedAnnouncing = exec.schedule( + new Runnable() + { + @Override + public void run() + { + try { + if (!(finished && queue.isEmpty())) { + List segments = Lists.newLinkedList(); + queue.drainTo(segments); + try { + announcer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + } + catch (IOException e) { + doneAnnouncing.setException( + new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) + ); + } + } else { + doneAnnouncing.set(true); + } + } + catch (Exception e) { + doneAnnouncing.setException(e); + } + } + }, + intervalMillis, + TimeUnit.MILLISECONDS + ); + } + + public void finishAnnouncing() throws SegmentLoadingException + { + finished = true; + + if (startedAnnouncing != null && (startedAnnouncing.isDone() || !startedAnnouncing.cancel(false))) { + log.info("Waiting for background segment announcing task to complete"); + // background announcing already started, wait for it to complete + try { + doneAnnouncing.get(); + } + catch (InterruptedException e) { + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + catch (ExecutionException e) { + throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); + } + } else { + // background task has not started yet, announcing immediately + try { + announcer.announceSegments(queue); + } + catch (Exception e) { + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); + } + } + log.info("Completed background segment announcing"); + } + + @Override + public void close() + { + finished = true; + if (nextAnnoucement != null) { + nextAnnoucement.cancel(false); + } + } + } } From 815ebeee256ef6d1221d979e3e866d2c3450110b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 18 Sep 2014 12:06:50 -0700 Subject: [PATCH 2/6] add docs for background segments loading --- docs/content/Configuration.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index 267d43c7025..4681085b2b2 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -155,8 +155,9 @@ Druid storage nodes maintain information about segments they have already downlo |--------|-----------|-------| |`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) | |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true| -|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|5 minutes| +|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| +|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing|5000 (5 seconds)| ### Jetty Server Module From 12449481e391c67eac8143d073a167d68f768f7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 22 Sep 2014 16:33:25 -0700 Subject: [PATCH 3/6] parallelize segment loading --- .../segment/loading/SegmentLoaderConfig.java | 8 + .../server/bridge/BridgeZkCoordinator.java | 4 +- .../coordination/BaseZkCoordinator.java | 22 ++- .../server/coordination/ZkCoordinator.java | 144 ++++++++++-------- .../server/bridge/DruidClusterBridgeTest.java | 2 + 5 files changed, 112 insertions(+), 68 deletions(-) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index bbed8d27f47..9abe5fc4497 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -43,6 +43,9 @@ public class SegmentLoaderConfig @JsonProperty("announceIntervalMillis") private int announceIntervalMillis = 5 * 1000; // 5 seconds + @JsonProperty("numLoadingThreads") + private int numLoadingThreads = 1; + @JsonProperty private File infoDir = null; @@ -66,6 +69,11 @@ public class SegmentLoaderConfig return announceIntervalMillis; } + public int getNumLoadingThreads() + { + return numLoadingThreads; + } + public File getInfoDir() { if (infoDir == null) { diff --git a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java index 9cbafaaa4dd..633ce904059 100644 --- a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java +++ b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import io.druid.client.ServerView; import io.druid.concurrent.Execs; import io.druid.db.DatabaseSegmentManager; +import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.server.coordination.BaseZkCoordinator; import io.druid.server.coordination.DataSegmentChangeCallback; @@ -53,6 +54,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator public BridgeZkCoordinator( ObjectMapper jsonMapper, ZkPathsConfig zkPaths, + SegmentLoaderConfig config, DruidServerMetadata me, @Bridge CuratorFramework curator, DbSegmentPublisher dbSegmentPublisher, @@ -60,7 +62,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator ServerView serverView ) { - super(jsonMapper, zkPaths, me, curator); + super(jsonMapper, zkPaths, config, me, curator); this.dbSegmentPublisher = dbSegmentPublisher; this.databaseSegmentManager = databaseSegmentManager; diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java index e751041747b..f31cfb6311c 100644 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java @@ -21,10 +21,13 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; +import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; @@ -34,6 +37,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** */ @@ -45,23 +50,33 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPaths; + private final SegmentLoaderConfig config; private final DruidServerMetadata me; private final CuratorFramework curator; private volatile PathChildrenCache loadQueueCache; private volatile boolean started; + private final ListeningExecutorService loadingExec; public BaseZkCoordinator( ObjectMapper jsonMapper, ZkPathsConfig zkPaths, + SegmentLoaderConfig config, DruidServerMetadata me, CuratorFramework curator ) { this.jsonMapper = jsonMapper; this.zkPaths = zkPaths; + this.config = config; this.me = me; this.curator = curator; + this.loadingExec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool( + config.getNumLoadingThreads(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() + ) + ); } @LifecycleStart @@ -83,7 +98,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler loadQueueLocation, true, true, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() + loadingExec ); try { @@ -200,4 +215,9 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler public abstract void loadLocalCache(); public abstract DataSegmentChangeHandler getDataSegmentChangeHandler(); + + public ListeningExecutorService getLoadingExecutor() + { + return loadingExec; + } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index ec2510df1fb..70affc17854 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -22,6 +22,7 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; @@ -36,6 +37,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -66,7 +68,7 @@ public class ZkCoordinator extends BaseZkCoordinator ScheduledExecutorFactory factory ) { - super(jsonMapper, zkPaths, me, curator); + super(jsonMapper, zkPaths, config, me, curator); this.jsonMapper = jsonMapper; this.config = config; @@ -127,42 +129,47 @@ public class ZkCoordinator extends BaseZkCoordinator return ZkCoordinator.this; } + private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + { + final boolean loaded; + try { + loaded = serverManager.loadSegment(segment); + } + catch (Exception e) { + removeSegment(segment, callback); + throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); + } + + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment, callback); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + } + return loaded; + } + @Override public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) { try { log.info("Loading segment %s", segment.getIdentifier()); - - final boolean loaded; - try { - loaded = serverManager.loadSegment(segment); - } - catch (Exception e) { - removeSegment(segment, callback); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); - } - - if (loaded) { - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment, callback); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); - } - } - + if(loadSegment(segment, callback)) { try { announcer.announceSegment(segment); } catch (IOException e) { throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } - } + }; } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") @@ -174,54 +181,58 @@ public class ZkCoordinator extends BaseZkCoordinator } } - public void addSegments(Iterable segments, DataSegmentChangeCallback callback) + public void addSegments(Iterable segments, final DataSegmentChangeCallback callback) { try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { backgroundSegmentAnnouncer.startAnnouncing(); - final List segmentFailures = Lists.newArrayList(); - for (DataSegment segment : segments) { - log.info("Loading segment %s", segment.getIdentifier()); + final List> segmentLoading = Lists.newArrayList(); - final boolean loaded; - try { - loaded = serverManager.loadSegment(segment); - } - catch (Exception e) { - log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); - removeSegment(segment, callback); - segmentFailures.add(segment.getIdentifier()); - continue; - } - - if (loaded) { - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment, callback); - segmentFailures.add(segment.getIdentifier()); - continue; - } - } - - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } catch(InterruptedException e) { - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - } + for (final DataSegment segment : segments) { + segmentLoading.add( + getLoadingExecutor().submit( + new Callable() + { + @Override + public Boolean call() throws SegmentLoadingException + { + try { + log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded = loadSegment(segment, callback); + if (loaded) { + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + } + return loaded; + } catch(SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getIdentifier()); + throw e; + } + } + } + ) + ); } - if (!segmentFailures.isEmpty()) { - for (String segmentFailure : segmentFailures) { - log.error("%s failed to load", segmentFailure); + int failed = 0; + for(ListenableFuture future : segmentLoading) { + try { + future.get(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } catch(ExecutionException e) { + failed++; } - throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); + } + if(failed > 0) { + throw new SegmentLoadingException("%,d errors seen while loading segments", failed); } backgroundSegmentAnnouncer.finishAnnouncing(); @@ -292,7 +303,7 @@ public class ZkCoordinator extends BaseZkCoordinator private volatile ScheduledFuture startedAnnouncing = null; private volatile ScheduledFuture nextAnnoucement = null; - private BackgroundSegmentAnnouncer( + public BackgroundSegmentAnnouncer( DataSegmentAnnouncer announcer, ScheduledExecutorService exec, int intervalMillis @@ -366,6 +377,7 @@ public class ZkCoordinator extends BaseZkCoordinator doneAnnouncing.get(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new SegmentLoadingException(e, "Loading Interrupted"); } catch (ExecutionException e) { diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index 0136a99482e..670fd892c24 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -31,6 +31,7 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.db.DatabaseSegmentManager; import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.server.DruidNode; import io.druid.server.coordination.BatchDataSegmentAnnouncer; @@ -156,6 +157,7 @@ public class DruidClusterBridgeTest BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator( jsonMapper, zkPathsConfig, + new SegmentLoaderConfig(), metadata, remoteCf, dbSegmentPublisher, From d7b39fa7ea5db2abe11b5950c6fb3747caa14c6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 23 Sep 2014 09:30:36 -0700 Subject: [PATCH 4/6] add documentation --- docs/content/Configuration.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index 4681085b2b2..5d09851a924 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -157,7 +157,8 @@ Druid storage nodes maintain information about segments they have already downlo |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true| |`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| -|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing|5000 (5 seconds)| +|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| +|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1| ### Jetty Server Module From 05d4f71ddc98bc2e88def12f78854fc0806e645e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 23 Sep 2014 14:55:22 -0700 Subject: [PATCH 5/6] fix background annoucing race condition --- .../server/coordination/ZkCoordinator.java | 91 +++++++++++-------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 70affc17854..8024edf3daa 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -187,15 +187,15 @@ public class ZkCoordinator extends BaseZkCoordinator new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { backgroundSegmentAnnouncer.startAnnouncing(); - final List> segmentLoading = Lists.newArrayList(); + final List segmentLoading = Lists.newArrayList(); for (final DataSegment segment : segments) { segmentLoading.add( getLoadingExecutor().submit( - new Callable() + new Callable() { @Override - public Boolean call() throws SegmentLoadingException + public Void call() throws SegmentLoadingException { try { log.info("Loading segment %s", segment.getIdentifier()); @@ -209,7 +209,7 @@ public class ZkCoordinator extends BaseZkCoordinator throw new SegmentLoadingException(e, "Loading Interrupted"); } } - return loaded; + return null; } catch(SegmentLoadingException e) { log.error(e, "[%s] failed to load", segment.getIdentifier()); throw e; @@ -299,6 +299,8 @@ public class ZkCoordinator extends BaseZkCoordinator private final LinkedBlockingQueue queue; private final SettableFuture doneAnnouncing; + private final Object lock = new Object(); + private volatile boolean finished = false; private volatile ScheduledFuture startedAnnouncing = null; private volatile ScheduledFuture nextAnnoucement = null; @@ -339,25 +341,27 @@ public class ZkCoordinator extends BaseZkCoordinator @Override public void run() { - try { - if (!(finished && queue.isEmpty())) { - List segments = Lists.newLinkedList(); - queue.drainTo(segments); - try { - announcer.announceSegments(segments); - nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + synchronized (lock) { + try { + if (!(finished && queue.isEmpty())) { + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + try { + announcer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + } + catch (IOException e) { + doneAnnouncing.setException( + new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) + ); + } + } else { + doneAnnouncing.set(true); } - catch (IOException e) { - doneAnnouncing.setException( - new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) - ); - } - } else { - doneAnnouncing.set(true); } - } - catch (Exception e) { - doneAnnouncing.setException(e); + catch (Exception e) { + doneAnnouncing.setException(e); + } } } }, @@ -368,13 +372,29 @@ public class ZkCoordinator extends BaseZkCoordinator public void finishAnnouncing() throws SegmentLoadingException { - finished = true; - - if (startedAnnouncing != null && (startedAnnouncing.isDone() || !startedAnnouncing.cancel(false))) { - log.info("Waiting for background segment announcing task to complete"); - // background announcing already started, wait for it to complete + synchronized (lock) { + finished = true; + // announce any remaining segments try { - doneAnnouncing.get(); + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + announcer.announceSegments(segments); + } + catch (Exception e) { + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); + } + + // get any exception that may have been thrown in background annoucing + try { + // check in case we did not call startAnnouncing + if (startedAnnouncing != null) { + startedAnnouncing.cancel(false); + } + // - if the task is waiting on the lock, then the queue will be empty by the time it runs + // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing + if (doneAnnouncing.isDone()) { + doneAnnouncing.get(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -383,14 +403,6 @@ public class ZkCoordinator extends BaseZkCoordinator catch (ExecutionException e) { throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); } - } else { - // background task has not started yet, announcing immediately - try { - announcer.announceSegments(queue); - } - catch (Exception e) { - throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); - } } log.info("Completed background segment announcing"); } @@ -398,9 +410,12 @@ public class ZkCoordinator extends BaseZkCoordinator @Override public void close() { - finished = true; - if (nextAnnoucement != null) { - nextAnnoucement.cancel(false); + // stop background scheduling + synchronized (lock) { + finished = true; + if (nextAnnoucement != null) { + nextAnnoucement.cancel(false); + } } } } From 35fb210cfa42fdc7521ead9fd33995666005c8bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 23 Sep 2014 16:05:32 -0700 Subject: [PATCH 6/6] add test for parallel loading --- .../coordination/ZkCoordinatorTest.java | 91 +++++++++++++++---- 1 file changed, 73 insertions(+), 18 deletions(-) diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index f50ae3e16fc..666c72c0f5c 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -21,6 +21,7 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.concurrent.ScheduledExecutors; @@ -50,18 +51,22 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** */ public class ZkCoordinatorTest extends CuratorTestBase { private static final Logger log = new Logger(ZkCoordinatorTest.class); + public static final int COUNT = 50; private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private ZkCoordinator zkCoordinator; private ServerManager serverManager; private DataSegmentAnnouncer announcer; private File infoDir; + private AtomicInteger announceCount; @Before public void setUp() throws Exception @@ -101,9 +106,41 @@ public class ZkCoordinatorTest extends CuratorTestBase } }; - announcer = new SingleDataSegmentAnnouncer( - me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper - ); + announceCount = new AtomicInteger(0); + announcer = new DataSegmentAnnouncer() + { + private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer( + me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper + ); + + @Override + public void announceSegment(DataSegment segment) throws IOException + { + announceCount.incrementAndGet(); + delegate.announceSegment(segment); + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + announceCount.decrementAndGet(); + delegate.unannounceSegment(segment); + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + announceCount.addAndGet(Iterables.size(segments)); + delegate.announceSegments(segments); + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + announceCount.addAndGet(-Iterables.size(segments)); + delegate.unannounceSegments(segments); + } + }; zkCoordinator = new ZkCoordinator( jsonMapper, @@ -114,6 +151,18 @@ public class ZkCoordinatorTest extends CuratorTestBase { return infoDir; } + + @Override + public int getNumLoadingThreads() + { + return 5; + } + + @Override + public int getAnnounceIntervalMillis() + { + return 50; + } }, zkPaths, me, @@ -133,21 +182,22 @@ public class ZkCoordinatorTest extends CuratorTestBase @Test public void testLoadCache() throws Exception { - List segments = Lists.newArrayList( - makeSegment("test", "1", new Interval("P1d/2011-04-01")), - makeSegment("test", "1", new Interval("P1d/2011-04-02")), - makeSegment("test", "2", new Interval("P1d/2011-04-02")), - makeSegment("test", "1", new Interval("P1d/2011-04-03")), - makeSegment("test", "1", new Interval("P1d/2011-04-04")), - makeSegment("test", "1", new Interval("P1d/2011-04-05")), - makeSegment("test", "2", new Interval("PT1h/2011-04-04T01")), - makeSegment("test", "2", new Interval("PT1h/2011-04-04T02")), - makeSegment("test", "2", new Interval("PT1h/2011-04-04T03")), - makeSegment("test", "2", new Interval("PT1h/2011-04-04T05")), - makeSegment("test", "2", new Interval("PT1h/2011-04-04T06")), - makeSegment("test2", "1", new Interval("P1d/2011-04-01")), - makeSegment("test2", "1", new Interval("P1d/2011-04-02")) - ); + List segments = Lists.newLinkedList(); + for(int i = 0; i < COUNT; ++i) { + segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01"))); + segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02"))); + segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02"))); + segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-03"))); + segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-04"))); + segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-05"))); + segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T01"))); + segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T02"))); + segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T03"))); + segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T05"))); + segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T06"))); + segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-01"))); + segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-02"))); + } Collections.sort(segments); for (DataSegment segment : segments) { @@ -158,6 +208,11 @@ public class ZkCoordinatorTest extends CuratorTestBase Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); zkCoordinator.start(); Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); + for(int i = 0; i < COUNT; ++i) { + Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); + } + Assert.assertEquals(13 * COUNT, announceCount.get()); zkCoordinator.stop(); for (DataSegment segment : segments) {