From cd0fa34a73bfd38b133fc0e1a28f4d26b947cfc7 Mon Sep 17 00:00:00 2001 From: cheddar Date: Tue, 25 Jun 2013 11:18:04 -0700 Subject: [PATCH 1/5] 1) Log exception when there is an error removing a segment --- .../main/java/com/metamx/druid/coordination/ZkCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 1befc1df888..0740be81b5d 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -261,7 +261,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler announcer.unannounceSegment(segment); } catch (Exception e) { - log.makeAlert("Failed to remove segment") + log.makeAlert(e, "Failed to remove segment") .addData("segment", segment) .emit(); } From 797a083b698a120d269a753f7fcd96f617659e7f Mon Sep 17 00:00:00 2001 From: cheddar Date: Mon, 1 Jul 2013 14:56:57 -0700 Subject: [PATCH 2/5] 1) Adjust SingleSegmentLoader to allow for storing segments on multiple different mount points. The specification language is really janky right now, so this is remaining a stealth feature for the time being. --- .../druid/indexing/common/TaskToolbox.java | 12 +- .../druid/loading/DataSegmentPuller.java | 4 + .../druid/loading/DataSegmentPusherUtil.java | 18 -- .../druid/loading/SegmentLoaderConfig.java | 10 +- .../druid/loading/SingleSegmentLoader.java | 256 +++++++++++------- 5 files changed, 173 insertions(+), 127 deletions(-) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java index 0ee2a6e7632..5bbfd73abbe 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java @@ -24,6 +24,10 @@ import com.google.common.collect.Maps; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.ServerView; import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.indexing.common.actions.TaskActionClient; +import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; +import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.MMappedQueryableIndexFactory; @@ -31,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SingleSegmentLoader; -import com.metamx.druid.indexing.common.actions.TaskActionClient; -import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; -import com.metamx.druid.indexing.common.config.TaskConfig; -import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -141,9 +141,9 @@ public class TaskToolbox new SegmentLoaderConfig() { @Override - public File getCacheDirectory() + public String getCacheDirectory() { - return new File(getTaskWorkDir(), "fetched_segments"); + return new File(getTaskWorkDir(), "fetched_segments").toString(); } } ); diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java index b821c653a6e..306d7f449af 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java @@ -39,9 +39,13 @@ public interface DataSegmentPuller /** * Returns the last modified time of the given segment. * + * Note, this is not actually used at this point and doesn't need to actually be implemented. It's just still here + * to not break compatibility. + * * @param segment The segment to check the last modified time for * @return the last modified time in millis from the epoch * @throws SegmentLoadingException if there are any errors */ + @Deprecated public long getLastModified(DataSegment segment) throws SegmentLoadingException; } diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java index bc44f82f3dd..e72bd787bb3 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java @@ -20,32 +20,14 @@ package com.metamx.druid.loading; import com.google.common.base.Joiner; -import com.metamx.common.MapUtils; import com.metamx.druid.client.DataSegment; -import java.util.Map; - /** */ public class DataSegmentPusherUtil { private static final Joiner JOINER = Joiner.on("/").skipNulls(); - public static String getLegacyStorageDir(DataSegment segment) - { - final Map loadSpec = segment.getLoadSpec(); - - String specType = MapUtils.getString(loadSpec, "type"); - if (specType.startsWith("s3")) { - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - - return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/"))); - } - - return null; - } - public static String getStorageDir(DataSegment segment) { return JOINER.join( diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java index 294c91b9a38..8a0e32484e1 100644 --- a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java @@ -21,14 +21,18 @@ package com.metamx.druid.loading; import org.skife.config.Config; -import java.io.File; - /** */ public abstract class SegmentLoaderConfig { @Config({"druid.paths.indexCache", "druid.segmentCache.path"}) - public abstract File getCacheDirectory(); + public abstract String getCacheDirectory(); + + @Config("druid.server.maxSize") + public long getServerMaxSize() + { + return Long.MAX_VALUE; + } @Config("druid.segmentCache.deleteOnRemove") public boolean deleteOnRemove() diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index 61e9986f484..f0e9a7f20e8 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -19,9 +19,13 @@ package com.metamx.druid.loading; -import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import com.google.inject.Inject; -import com.metamx.common.StreamUtils; +import com.metamx.common.IAE; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.QueryableIndex; @@ -29,7 +33,11 @@ import com.metamx.druid.index.QueryableIndexSegment; import com.metamx.druid.index.Segment; import org.apache.commons.io.FileUtils; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; /** */ @@ -39,8 +47,8 @@ public class SingleSegmentLoader implements SegmentLoader private final DataSegmentPuller dataSegmentPuller; private final QueryableIndexFactory factory; - private final SegmentLoaderConfig config; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + + private final List locations; @Inject public SingleSegmentLoader( @@ -51,22 +59,52 @@ public class SingleSegmentLoader implements SegmentLoader { this.dataSegmentPuller = dataSegmentPuller; this.factory = factory; - this.config = config; + + final ImmutableList.Builder locBuilder = ImmutableList.builder(); + + // This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone + // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that + // But, that'll have to wait for some other day. + for (String dirSpec : config.getCacheDirectory().split(",")) { + String[] dirSplit = dirSpec.split("\\|"); + if (dirSplit.length == 1) { + locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize())); + } + else if (dirSplit.length == 2) { + final Long maxSize = Longs.tryParse(dirSplit[1]); + if (maxSize == null) { + throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]); + } + locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize)); + } + else { + throw new ISE( + "Unknown segment storage location[%s]=>[%s], config[%s].", + dirSplit.length, dirSpec, config.getCacheDirectory() + ); + } + } + locations = locBuilder.build(); + + Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory."); + log.info("Using storage locations[%s]", locations); } @Override public boolean isSegmentLoaded(final DataSegment segment) { - File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); - if (localStorageDir.exists()) { - return true; - } + return findStorageLocationIfLoaded(segment) != null; + } - final File legacyStorageDir = new File( - config.getCacheDirectory(), - DataSegmentPusherUtil.getLegacyStorageDir(segment) - ); - return legacyStorageDir.exists(); + public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + { + for (StorageLocation location : locations) { + File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (localStorageDir.exists()) { + return location; + } + } + return null; } @Override @@ -80,111 +118,129 @@ public class SingleSegmentLoader implements SegmentLoader public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { - File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); + StorageLocation loc = findStorageLocationIfLoaded(segment); - final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment); - if (legacyDir != null) { - File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir); + final File retVal; - if (legacyStorageDir.exists()) { - log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir); - if (localStorageDir.exists()) { - try { - FileUtils.deleteDirectory(localStorageDir); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir); - } - } - final File parentDir = localStorageDir.getParentFile(); - if (!parentDir.exists()) { - log.info("Parent[%s] didn't exist, creating.", parentDir); - if (!parentDir.mkdirs()) { - log.warn("Unable to make parentDir[%s]", parentDir); - } - } - - if (!legacyStorageDir.renameTo(localStorageDir)) { - log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir); - } + if (loc == null) { + Iterator locIter = locations.iterator(); + loc = locIter.next(); + while (locIter.hasNext()) { + loc = loc.mostEmpty(locIter.next()); } - } - if (localStorageDir.exists()) { - long localLastModified = localStorageDir.lastModified(); - long remoteLastModified = dataSegmentPuller.getLastModified(segment); - if (remoteLastModified > 0 && localLastModified >= remoteLastModified) { - log.info( - "Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.", - localStorageDir, localLastModified, remoteLastModified - ); - return localStorageDir; - } - } - - if (localStorageDir.exists()) { - try { - FileUtils.deleteDirectory(localStorageDir); - } - catch (IOException e) { - log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir); - } - } - if (!localStorageDir.mkdirs()) { - log.info("Unable to make parent file[%s]", localStorageDir); - } - - dataSegmentPuller.getSegmentFiles(segment, localStorageDir); - - return localStorageDir; - } - - private File getLocalStorageDir(DataSegment segment) - { - String outputKey = JOINER.join( - segment.getDataSource(), - String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - - return new File(config.getCacheDirectory(), outputKey); - } - - private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException - { - log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile); - if (!pulledFile.renameTo(cacheFile)) { - log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile); - - try { - StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile); - } - catch (IOException e) { - throw new SegmentLoadingException( - e, - "Problem moving pulledFile[%s] to cache[%s]", - pulledFile, - cacheFile + if (!loc.canHandle(segment.getSize())) { + throw new ISE( + "Segment[%s:%,d] too large for storage[%s:%,d].", + segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available() ); } - if (!pulledFile.delete()) { - log.error("Could not delete pulledFile[%s].", pulledFile); + + File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (!storageDir.mkdirs()) { + log.debug("Unable to make parent file[%s]", storageDir); } + + dataSegmentPuller.getSegmentFiles(segment, storageDir); + loc.addSegment(segment); + + retVal = storageDir; } + else { + retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + } + + loc.addSegment(segment); + + return retVal; } @Override public void cleanup(DataSegment segment) throws SegmentLoadingException { - File cacheFile = getLocalStorageDir(segment); + StorageLocation loc = findStorageLocationIfLoaded(segment); + + if (loc == null) { + log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); + return; + } try { + File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); log.info("Deleting directory[%s]", cacheFile); FileUtils.deleteDirectory(cacheFile); + loc.removeSegment(segment); } catch (IOException e) { throw new SegmentLoadingException(e, e.getMessage()); } } + + private static class StorageLocation + { + private final File path; + private final long maxSize; + private final Set segments; + + private volatile long currSize = 0; + + StorageLocation( + File path, + long maxSize + ) + { + this.path = path; + this.maxSize = maxSize; + + this.segments = Sets.newHashSet(); + } + + private File getPath() + { + return path; + } + + private Long getMaxSize() + { + return maxSize; + } + + private synchronized void addSegment(DataSegment segment) + { + if (! segments.add(segment)) { + currSize += segment.getSize(); + } + } + + private synchronized void removeSegment(DataSegment segment) + { + if (segments.remove(segment)) { + currSize -= segment.getSize(); + } + } + + private boolean canHandle(long size) + { + return available() > size; + } + + private synchronized long available() + { + return maxSize - currSize; + } + + private StorageLocation mostEmpty(StorageLocation other) + { + return available() > other.available() ? this : other; + } + + @Override + public String toString() + { + return "StorageLocation{" + + "path=" + path + + ", maxSize=" + maxSize + + '}'; + } + } } From 090921b43e662013e5aa8c1cc111ba8d95530fcc Mon Sep 17 00:00:00 2001 From: cheddar Date: Mon, 1 Jul 2013 16:04:25 -0700 Subject: [PATCH 3/5] [maven-release-plugin] prepare release druid-0.5.5 --- client/pom.xml | 2 +- common/pom.xml | 2 +- examples/pom.xml | 2 +- indexing-common/pom.xml | 2 +- indexing-hadoop/pom.xml | 2 +- indexing-service/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- services/pom.xml | 4 ++-- 10 files changed, 11 insertions(+), 11 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index 27cc070c519..2c37bb02a2a 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/common/pom.xml b/common/pom.xml index ba47a0296bd..a6b3fcbe020 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/examples/pom.xml b/examples/pom.xml index 4a1cbc929ac..6fb7b76e3e5 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -9,7 +9,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml index 3971a922c5b..7deb59222d2 100644 --- a/indexing-common/pom.xml +++ b/indexing-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 1da0d489862..6ec0aba4853 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index d27563fceea..e1fca9927ab 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/pom.xml b/pom.xml index 83dcd517782..c74db7565a3 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.5.5-SNAPSHOT + 0.5.5 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 829d724d95f..1bed28e864a 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/server/pom.xml b/server/pom.xml index 3f9b33f5ba8..6c06ccdfb49 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 diff --git a/services/pom.xml b/services/pom.xml index 18abf630390..3955302cf61 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.5.5-SNAPSHOT + 0.5.5 com.metamx druid - 0.5.5-SNAPSHOT + 0.5.5 From fd1d73e83abdff8dcea9d3c8bcc9c3278ce92dea Mon Sep 17 00:00:00 2001 From: cheddar Date: Mon, 1 Jul 2013 16:04:31 -0700 Subject: [PATCH 4/5] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- examples/pom.xml | 2 +- indexing-common/pom.xml | 2 +- indexing-hadoop/pom.xml | 2 +- indexing-service/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- services/pom.xml | 4 ++-- 10 files changed, 11 insertions(+), 11 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index 2c37bb02a2a..6979f20000c 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index a6b3fcbe020..1964f5658ff 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 6fb7b76e3e5..d3bd28bd236 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -9,7 +9,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml index 7deb59222d2..fd727328c49 100644 --- a/indexing-common/pom.xml +++ b/indexing-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 6ec0aba4853..a8b8d429c79 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index e1fca9927ab..37cb5b9c2e4 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/pom.xml b/pom.xml index c74db7565a3..ede50cdf143 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.5.5 + 0.5.6-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 1bed28e864a..abe217c1af2 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index 6c06ccdfb49..e32b027219a 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT diff --git a/services/pom.xml b/services/pom.xml index 3955302cf61..d01c1f44706 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.5.5 + 0.5.6-SNAPSHOT com.metamx druid - 0.5.5 + 0.5.6-SNAPSHOT From 07129418257496c2ceb4b716c40c68717d28036b Mon Sep 17 00:00:00 2001 From: cheddar Date: Tue, 2 Jul 2013 11:53:53 -0700 Subject: [PATCH 5/5] 1) Add check whether a Hydrant has already been persisted before persisting. Persisting happens synchronously on the same thread, but multiple persist requests can be queued up on that thread which means that subsequent ones would fail with an NPE. Fixes #178 --- .../druid/realtime/plumber/RealtimePlumberSchool.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index 5ba229ea56a..a429fbef9d5 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -662,6 +662,14 @@ public class RealtimePlumberSchool implements PlumberSchool */ private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) { + if (indexToPersist.hasSwapped()) { + log.info( + "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", + schema.getDataSource(), interval, indexToPersist + ); + return 0; + } + log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); try { int numRows = indexToPersist.getIndex().size();