diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md
index 267d43c7025..5d09851a924 100644
--- a/docs/content/Configuration.md
+++ b/docs/content/Configuration.md
@@ -155,8 +155,10 @@ Druid storage nodes maintain information about segments they have already downlo
 |--------|-----------|-------|
 |`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) |
 |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
-|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|5 minutes|
+|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)|
 |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
+|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
+|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1|
 
 ### Jetty Server Module
 
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index 2cba40194a0..9abe5fc4497 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -40,6 +40,12 @@ public class SegmentLoaderConfig
   @JsonProperty("dropSegmentDelayMillis")
   private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
 
+  @JsonProperty("announceIntervalMillis")
+  private int announceIntervalMillis = 5 * 1000; // 5 seconds
+
+  @JsonProperty("numLoadingThreads")
+  private int numLoadingThreads = 1;
+
   @JsonProperty
   private File infoDir = null;
 
@@ -58,6 +64,16 @@ public class SegmentLoaderConfig
     return dropSegmentDelayMillis;
   }
 
+  public int getAnnounceIntervalMillis()
+  {
+    return announceIntervalMillis;
+  }
+
+  public int getNumLoadingThreads()
+  {
+    return numLoadingThreads;
+  }
+
   public File getInfoDir()
   {
     if (infoDir == null) {
diff --git a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java
index 9cbafaaa4dd..633ce904059 100644
--- a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java
@@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
 import io.druid.client.ServerView;
 import io.druid.concurrent.Execs;
 import io.druid.db.DatabaseSegmentManager;
+import io.druid.segment.loading.SegmentLoaderConfig;
 import io.druid.segment.realtime.DbSegmentPublisher;
 import io.druid.server.coordination.BaseZkCoordinator;
 import io.druid.server.coordination.DataSegmentChangeCallback;
@@ -53,6 +54,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
   public BridgeZkCoordinator(
       ObjectMapper jsonMapper,
       ZkPathsConfig zkPaths,
+      SegmentLoaderConfig config,
       DruidServerMetadata me,
       @Bridge CuratorFramework curator,
       DbSegmentPublisher dbSegmentPublisher,
@@ -60,7 +62,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
       ServerView serverView
   )
   {
-    super(jsonMapper, zkPaths, me, curator);
+    super(jsonMapper, zkPaths, config, me, curator);
 
     this.dbSegmentPublisher = dbSegmentPublisher;
     this.databaseSegmentManager = databaseSegmentManager;
diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
index e751041747b..f31cfb6311c 100644
--- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
@@ -21,10 +21,13 @@ package io.druid.server.coordination;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.metamx.common.lifecycle.LifecycleStart;
 import com.metamx.common.lifecycle.LifecycleStop;
 import com.metamx.emitter.EmittingLogger;
+import io.druid.segment.loading.SegmentLoaderConfig;
 import io.druid.server.initialization.ZkPathsConfig;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
@@ -34,6 +37,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  */
@@ -45,23 +50,33 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
 
   private final ObjectMapper jsonMapper;
   private final ZkPathsConfig zkPaths;
+  private final SegmentLoaderConfig config;
   private final DruidServerMetadata me;
   private final CuratorFramework curator;
 
   private volatile PathChildrenCache loadQueueCache;
   private volatile boolean started;
+  private final ListeningExecutorService loadingExec;
 
   public BaseZkCoordinator(
       ObjectMapper jsonMapper,
       ZkPathsConfig zkPaths,
+      SegmentLoaderConfig config,
       DruidServerMetadata me,
       CuratorFramework curator
   )
   {
     this.jsonMapper = jsonMapper;
     this.zkPaths = zkPaths;
+    this.config = config;
     this.me = me;
     this.curator = curator;
+    this.loadingExec = MoreExecutors.listeningDecorator(
+        Executors.newFixedThreadPool(
+            config.getNumLoadingThreads(),
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
+        )
+    );
   }
 
   @LifecycleStart
@@ -83,7 +98,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
           loadQueueLocation,
           true,
           true,
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
+          loadingExec
       );
 
       try {
@@ -200,4 +215,9 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
   public abstract void loadLocalCache();
 
   public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
+
+  public ListeningExecutorService getLoadingExecutor()
+  {
+    return loadingExec;
+  }
 }
diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
index b0611077aa3..8024edf3daa 100644
--- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
@@ -21,7 +21,11 @@ package io.druid.server.coordination;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Inject;
+import com.metamx.common.ISE;
 import com.metamx.common.concurrent.ScheduledExecutorFactory;
 import com.metamx.emitter.EmittingLogger;
 import io.druid.segment.loading.SegmentLoaderConfig;
@@ -33,7 +37,11 @@ import org.apache.curator.framework.CuratorFramework;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -60,7 +68,7 @@ public class ZkCoordinator extends BaseZkCoordinator
       ScheduledExecutorFactory factory
   )
   {
-    super(jsonMapper, zkPaths, me, curator);
+    super(jsonMapper, zkPaths, config, me, curator);
 
     this.jsonMapper = jsonMapper;
     this.config = config;
@@ -121,42 +129,47 @@ public class ZkCoordinator extends BaseZkCoordinator
     return ZkCoordinator.this;
   }
 
+  private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
+  {
+    final boolean loaded;
+    try {
+      loaded = serverManager.loadSegment(segment);
+    }
+    catch (Exception e) {
+      removeSegment(segment, callback);
+      throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
+    }
+
+    if (loaded) {
+      File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
+      if (!segmentInfoCacheFile.exists()) {
+        try {
+          jsonMapper.writeValue(segmentInfoCacheFile, segment);
+        }
+        catch (IOException e) {
+          removeSegment(segment, callback);
+          throw new SegmentLoadingException(
+              e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
+          );
+        }
+      }
+    }
+    return loaded;
+  }
+
   @Override
   public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
   {
     try {
       log.info("Loading segment %s", segment.getIdentifier());
-
-      final boolean loaded;
-      try {
-        loaded = serverManager.loadSegment(segment);
-      }
-      catch (Exception e) {
-        removeSegment(segment, callback);
-        throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
-      }
-
-      if (loaded) {
-        File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
-        if (!segmentInfoCacheFile.exists()) {
-          try {
-            jsonMapper.writeValue(segmentInfoCacheFile, segment);
-          }
-          catch (IOException e) {
-            removeSegment(segment, callback);
-            throw new SegmentLoadingException(
-                e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
-            );
-          }
-        }
-
+      if(loadSegment(segment, callback)) {
         try {
           announcer.announceSegment(segment);
         }
         catch (IOException e) {
           throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
         }
-      }
+      };
     }
     catch (SegmentLoadingException e) {
       log.makeAlert(e, "Failed to load segment for dataSource")
@@ -168,60 +181,64 @@ public class ZkCoordinator extends BaseZkCoordinator
     }
   }
 
-  public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
+  public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
   {
-    try {
-      final List<String> segmentFailures = Lists.newArrayList();
-      final List<DataSegment> validSegments = Lists.newArrayList();
+    try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
+            new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
+      backgroundSegmentAnnouncer.startAnnouncing();
 
-      for (DataSegment segment : segments) {
-        log.info("Loading segment %s", segment.getIdentifier());
+      final List<ListenableFuture> segmentLoading = Lists.newArrayList();
 
-        final boolean loaded;
+      for (final DataSegment segment : segments) {
+        segmentLoading.add(
+            getLoadingExecutor().submit(
+                new Callable<Void>()
+                {
+                  @Override
+                  public Void call() throws SegmentLoadingException
+                  {
+                    try {
+                      log.info("Loading segment %s", segment.getIdentifier());
+                      final boolean loaded = loadSegment(segment, callback);
+                      if (loaded) {
+                        try {
+                          backgroundSegmentAnnouncer.announceSegment(segment);
+                        }
+                        catch (InterruptedException e) {
+                          Thread.currentThread().interrupt();
+                          throw new SegmentLoadingException(e, "Loading Interrupted");
+                        }
+                      }
+                      return null;
+                    } catch(SegmentLoadingException e) {
+                      log.error(e, "[%s] failed to load", segment.getIdentifier());
+                      throw e;
+                    }
+                  }
+                }
+            )
+        );
+      }
+
+      int failed = 0;
+      for(ListenableFuture future : segmentLoading) {
         try {
-          loaded = serverManager.loadSegment(segment);
-        }
-        catch (Exception e) {
-          log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
-          removeSegment(segment, callback);
-          segmentFailures.add(segment.getIdentifier());
-          continue;
-        }
-
-        if (loaded) {
-          File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
-          if (!segmentInfoCacheFile.exists()) {
-            try {
-              jsonMapper.writeValue(segmentInfoCacheFile, segment);
-            }
-            catch (IOException e) {
-              log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
-              removeSegment(segment, callback);
-              segmentFailures.add(segment.getIdentifier());
-              continue;
-            }
-          }
-
-          validSegments.add(segment);
+          future.get();
+        } catch(InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new SegmentLoadingException(e, "Loading Interrupted");
+        } catch(ExecutionException e) {
+          failed++;
         }
       }
-
-      try {
-        announcer.announceSegments(validSegments);
-      }
-      catch (IOException e) {
-        throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
+      if(failed > 0) {
+        throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
       }
 
-      if (!segmentFailures.isEmpty()) {
-        for (String segmentFailure : segmentFailures) {
-          log.error("%s failed to load", segmentFailure);
-        }
-        throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size());
-      }
+      backgroundSegmentAnnouncer.finishAnnouncing();
     }
     catch (SegmentLoadingException e) {
-      log.makeAlert(e, "Failed to load segments for dataSource")
+      log.makeAlert(e, "Failed to load segments")
          .addData("segments", segments)
          .emit();
     }
@@ -272,4 +289,134 @@ public class ZkCoordinator extends BaseZkCoordinator
       callback.execute();
     }
   }
+
+  private static class BackgroundSegmentAnnouncer implements AutoCloseable {
+    private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
+
+    private final int intervalMillis;
+    private final DataSegmentAnnouncer announcer;
+    private final ScheduledExecutorService exec;
+    private final LinkedBlockingQueue<DataSegment> queue;
+    private final SettableFuture<Boolean> doneAnnouncing;
+
+    private final Object lock = new Object();
+
+    private volatile boolean finished = false;
+    private volatile ScheduledFuture startedAnnouncing = null;
+    private volatile ScheduledFuture nextAnnoucement = null;
+
+    public BackgroundSegmentAnnouncer(
+        DataSegmentAnnouncer announcer,
+        ScheduledExecutorService exec,
+        int intervalMillis
+    )
+    {
+      this.announcer = announcer;
+      this.exec = exec;
+      this.intervalMillis = intervalMillis;
+      this.queue = Queues.newLinkedBlockingQueue();
+      this.doneAnnouncing = SettableFuture.create();
+    }
+
+    public void announceSegment(final DataSegment segment) throws InterruptedException
+    {
+      if (finished) {
+        throw new ISE("Announce segment called after finishAnnouncing");
+      }
+      queue.put(segment);
+    }
+
+    public void startAnnouncing()
+    {
+      if (intervalMillis <= 0) {
+        return;
+      }
+
+      log.info("Starting background segment announcing task");
+
+      // schedule background announcing task
+      nextAnnoucement = startedAnnouncing = exec.schedule(
+          new Runnable()
+          {
+            @Override
+            public void run()
+            {
+              synchronized (lock) {
+                try {
+                  if (!(finished && queue.isEmpty())) {
+                    final List<DataSegment> segments = Lists.newLinkedList();
+                    queue.drainTo(segments);
+                    try {
+                      announcer.announceSegments(segments);
+                      nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
+                    }
+                    catch (IOException e) {
+                      doneAnnouncing.setException(
+                          new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
+                      );
+                    }
+                  } else {
+                    doneAnnouncing.set(true);
+                  }
+                }
+                catch (Exception e) {
+                  doneAnnouncing.setException(e);
+                }
+              }
+            }
+          },
+          intervalMillis,
+          TimeUnit.MILLISECONDS
+      );
+    }
+
+    public void finishAnnouncing() throws SegmentLoadingException
+    {
+      synchronized (lock) {
+        finished = true;
+        // announce any remaining segments
+        try {
+          final List<DataSegment> segments = Lists.newLinkedList();
+          queue.drainTo(segments);
+          announcer.announceSegments(segments);
+        }
+        catch (Exception e) {
+          throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
+        }
+
+        // get any exception that may have been thrown in background annoucing
+        try {
+          // check in case we did not call startAnnouncing
+          if (startedAnnouncing != null) {
+            startedAnnouncing.cancel(false);
+          }
+          // - if the task is waiting on the lock, then the queue will be empty by the time it runs
+          // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
+          if (doneAnnouncing.isDone()) {
+            doneAnnouncing.get();
+          }
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new SegmentLoadingException(e, "Loading Interrupted");
+        }
+        catch (ExecutionException e) {
+          throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
+        }
+      }
+      log.info("Completed background segment announcing");
+    }
+
+    @Override
+    public void close()
+    {
+      // stop background scheduling
+      synchronized (lock) {
+        finished = true;
+        if (nextAnnoucement != null) {
+          nextAnnoucement.cancel(false);
+        }
+      }
+    }
+  }
 }
diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java
index 0136a99482e..670fd892c24 100644
--- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java
+++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java
@@ -31,6 +31,7 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider;
 import io.druid.curator.announcement.Announcer;
 import io.druid.db.DatabaseSegmentManager;
 import io.druid.jackson.DefaultObjectMapper;
+import io.druid.segment.loading.SegmentLoaderConfig;
 import io.druid.segment.realtime.DbSegmentPublisher;
 import io.druid.server.DruidNode;
 import io.druid.server.coordination.BatchDataSegmentAnnouncer;
@@ -156,6 +157,7 @@ public class DruidClusterBridgeTest
     BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
         jsonMapper,
         zkPathsConfig,
+        new SegmentLoaderConfig(),
         metadata,
         remoteCf,
         dbSegmentPublisher,
diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java
index f50ae3e16fc..666c72c0f5c 100644
--- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java
@@ -21,6 +21,7 @@ package io.druid.server.coordination;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.metamx.common.concurrent.ScheduledExecutors;
@@ -50,18 +51,22 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  */
 public class ZkCoordinatorTest extends CuratorTestBase
 {
   private static final Logger log = new Logger(ZkCoordinatorTest.class);
+  public static final int COUNT = 50;
   private final ObjectMapper jsonMapper = new DefaultObjectMapper();
   private ZkCoordinator zkCoordinator;
   private ServerManager serverManager;
   private DataSegmentAnnouncer announcer;
   private File infoDir;
+  private AtomicInteger announceCount;
 
   @Before
   public void setUp() throws Exception
@@ -101,9 +106,41 @@ public class ZkCoordinatorTest extends CuratorTestBase
       }
     };
 
-    announcer = new SingleDataSegmentAnnouncer(
-        me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
-    );
+    announceCount = new AtomicInteger(0);
+    announcer = new DataSegmentAnnouncer()
+    {
+      private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer(
+          me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
+      );
+
+      @Override
+      public void announceSegment(DataSegment segment) throws IOException
+      {
+        announceCount.incrementAndGet();
+        delegate.announceSegment(segment);
+      }
+
+      @Override
+      public void unannounceSegment(DataSegment segment) throws IOException
+      {
+        announceCount.decrementAndGet();
+        delegate.unannounceSegment(segment);
+      }
+
+      @Override
+      public void announceSegments(Iterable<DataSegment> segments) throws IOException
+      {
+        announceCount.addAndGet(Iterables.size(segments));
+        delegate.announceSegments(segments);
+      }
+
+      @Override
+      public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
+      {
+        announceCount.addAndGet(-Iterables.size(segments));
+        delegate.unannounceSegments(segments);
+      }
+    };
 
     zkCoordinator = new ZkCoordinator(
         jsonMapper,
@@ -114,6 +151,18 @@ public class ZkCoordinatorTest extends CuratorTestBase
           {
             return infoDir;
           }
+
+          @Override
+          public int getNumLoadingThreads()
+          {
+            return 5;
+          }
+
+          @Override
+          public int getAnnounceIntervalMillis()
+          {
+            return 50;
+          }
         },
         zkPaths,
         me,
@@ -133,21 +182,22 @@ public class ZkCoordinatorTest extends CuratorTestBase
   @Test
   public void testLoadCache() throws Exception
   {
-    List<DataSegment> segments = Lists.newArrayList(
-        makeSegment("test", "1", new Interval("P1d/2011-04-01")),
-        makeSegment("test", "1", new Interval("P1d/2011-04-02")),
-        makeSegment("test", "2", new Interval("P1d/2011-04-02")),
-        makeSegment("test", "1", new Interval("P1d/2011-04-03")),
-        makeSegment("test", "1", new Interval("P1d/2011-04-04")),
-        makeSegment("test", "1", new Interval("P1d/2011-04-05")),
-        makeSegment("test", "2", new Interval("PT1h/2011-04-04T01")),
-        makeSegment("test", "2", new Interval("PT1h/2011-04-04T02")),
-        makeSegment("test", "2", new Interval("PT1h/2011-04-04T03")),
-        makeSegment("test", "2", new Interval("PT1h/2011-04-04T05")),
-        makeSegment("test", "2", new Interval("PT1h/2011-04-04T06")),
-        makeSegment("test2", "1", new Interval("P1d/2011-04-01")),
-        makeSegment("test2", "1", new Interval("P1d/2011-04-02"))
-    );
+    List<DataSegment> segments = Lists.newLinkedList();
+    for(int i = 0; i < COUNT; ++i) {
+      segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
+      segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
+      segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
+      segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-03")));
+      segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-04")));
+      segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-05")));
+      segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T01")));
+      segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T02")));
+      segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T03")));
+      segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T05")));
+      segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T06")));
+      segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-01")));
+      segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-02")));
+    }
     Collections.sort(segments);
 
     for (DataSegment segment : segments) {
@@ -158,6 +208,11 @@ public class ZkCoordinatorTest extends CuratorTestBase
     Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
     zkCoordinator.start();
     Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
+    for(int i = 0; i < COUNT; ++i) {
+      Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
+      Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
+    }
+    Assert.assertEquals(13 * COUNT, announceCount.get());
     zkCoordinator.stop();
 
     for (DataSegment segment : segments) {