diff --git a/client/pom.xml b/client/pom.xml
index 27cc070c519..6979f20000c 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index ba47a0296bd..1964f5658ff 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/examples/pom.xml b/examples/pom.xml
index 4a1cbc929ac..d3bd28bd236 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 3971a922c5b..fd727328c49 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 1da0d489862..a8b8d429c79 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index d27563fceea..37cb5b9c2e4 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java
index 0ee2a6e7632..5bbfd73abbe 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java
@@ -24,6 +24,10 @@ import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
+import com.metamx.druid.indexing.common.actions.TaskActionClient;
+import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
+import com.metamx.druid.indexing.common.config.TaskConfig;
+import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
@@ -31,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
-import com.metamx.druid.indexing.common.actions.TaskActionClient;
-import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
-import com.metamx.druid.indexing.common.config.TaskConfig;
-import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@@ -141,9 +141,9 @@ public class TaskToolbox
new SegmentLoaderConfig()
{
@Override
- public File getCacheDirectory()
+ public String getCacheDirectory()
{
- return new File(getTaskWorkDir(), "fetched_segments");
+ return new File(getTaskWorkDir(), "fetched_segments").toString();
}
}
);
diff --git a/pom.xml b/pom.xml
index 743fe894033..6736f5cc377 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
com.metamx
druid
pom
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
druid
druid
diff --git a/realtime/pom.xml b/realtime/pom.xml
index 829d724d95f..abe217c1af2 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java
index 5ba229ea56a..a429fbef9d5 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java
@@ -662,6 +662,14 @@ public class RealtimePlumberSchool implements PlumberSchool
*/
private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{
+ if (indexToPersist.hasSwapped()) {
+ log.info(
+ "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
+ schema.getDataSource(), interval, indexToPersist
+ );
+ return 0;
+ }
+
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
try {
int numRows = indexToPersist.getIndex().size();
diff --git a/server/pom.xml b/server/pom.xml
index 3f9b33f5ba8..e32b027219a 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java
index b821c653a6e..306d7f449af 100644
--- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java
+++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java
@@ -39,9 +39,13 @@ public interface DataSegmentPuller
/**
* Returns the last modified time of the given segment.
*
+ * Note, this is not actually used at this point and doesn't need to actually be implemented. It's just still here
+ * to not break compatibility.
+ *
* @param segment The segment to check the last modified time for
* @return the last modified time in millis from the epoch
* @throws SegmentLoadingException if there are any errors
*/
+ @Deprecated
public long getLastModified(DataSegment segment) throws SegmentLoadingException;
}
diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java
index bc44f82f3dd..e72bd787bb3 100644
--- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java
+++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java
@@ -20,32 +20,14 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
-import com.metamx.common.MapUtils;
import com.metamx.druid.client.DataSegment;
-import java.util.Map;
-
/**
*/
public class DataSegmentPusherUtil
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
- public static String getLegacyStorageDir(DataSegment segment)
- {
- final Map loadSpec = segment.getLoadSpec();
-
- String specType = MapUtils.getString(loadSpec, "type");
- if (specType.startsWith("s3")) {
- String s3Bucket = MapUtils.getString(loadSpec, "bucket");
- String s3Path = MapUtils.getString(loadSpec, "key");
-
- return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/")));
- }
-
- return null;
- }
-
public static String getStorageDir(DataSegment segment)
{
return JOINER.join(
diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java
index 294c91b9a38..8a0e32484e1 100644
--- a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java
@@ -21,14 +21,18 @@ package com.metamx.druid.loading;
import org.skife.config.Config;
-import java.io.File;
-
/**
*/
public abstract class SegmentLoaderConfig
{
@Config({"druid.paths.indexCache", "druid.segmentCache.path"})
- public abstract File getCacheDirectory();
+ public abstract String getCacheDirectory();
+
+ @Config("druid.server.maxSize")
+ public long getServerMaxSize()
+ {
+ return Long.MAX_VALUE;
+ }
@Config("druid.segmentCache.deleteOnRemove")
public boolean deleteOnRemove()
diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java
index b184cf97395..f0e9a7f20e8 100644
--- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java
+++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java
@@ -19,9 +19,13 @@
package com.metamx.druid.loading;
-import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import com.google.inject.Inject;
-import com.metamx.common.StreamUtils;
+import com.metamx.common.IAE;
+import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
@@ -30,8 +34,10 @@ import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
/**
*/
@@ -41,8 +47,8 @@ public class SingleSegmentLoader implements SegmentLoader
private final DataSegmentPuller dataSegmentPuller;
private final QueryableIndexFactory factory;
- private final SegmentLoaderConfig config;
- private static final Joiner JOINER = Joiner.on("/").skipNulls();
+
+ private final List locations;
@Inject
public SingleSegmentLoader(
@@ -53,22 +59,52 @@ public class SingleSegmentLoader implements SegmentLoader
{
this.dataSegmentPuller = dataSegmentPuller;
this.factory = factory;
- this.config = config;
+
+ final ImmutableList.Builder locBuilder = ImmutableList.builder();
+
+ // This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone
+ // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
+ // But, that'll have to wait for some other day.
+ for (String dirSpec : config.getCacheDirectory().split(",")) {
+ String[] dirSplit = dirSpec.split("\\|");
+ if (dirSplit.length == 1) {
+ locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize()));
+ }
+ else if (dirSplit.length == 2) {
+ final Long maxSize = Longs.tryParse(dirSplit[1]);
+ if (maxSize == null) {
+ throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]);
+ }
+ locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize));
+ }
+ else {
+ throw new ISE(
+ "Unknown segment storage location[%s]=>[%s], config[%s].",
+ dirSplit.length, dirSpec, config.getCacheDirectory()
+ );
+ }
+ }
+ locations = locBuilder.build();
+
+ Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory.");
+ log.info("Using storage locations[%s]", locations);
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
- File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
- if (localStorageDir.exists()) {
- return true;
- }
+ return findStorageLocationIfLoaded(segment) != null;
+ }
- final File legacyStorageDir = new File(
- config.getCacheDirectory(),
- DataSegmentPusherUtil.getLegacyStorageDir(segment)
- );
- return legacyStorageDir.exists();
+ public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
+ {
+ for (StorageLocation location : locations) {
+ File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
+ if (localStorageDir.exists()) {
+ return location;
+ }
+ }
+ return null;
}
@Override
@@ -82,111 +118,129 @@ public class SingleSegmentLoader implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
- File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
+ StorageLocation loc = findStorageLocationIfLoaded(segment);
- final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment);
- if (legacyDir != null) {
- File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir);
+ final File retVal;
- if (legacyStorageDir.exists()) {
- log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir);
- if (localStorageDir.exists()) {
- try {
- FileUtils.deleteDirectory(localStorageDir);
- }
- catch (IOException e) {
- throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir);
- }
- }
- final File parentDir = localStorageDir.getParentFile();
- if (!parentDir.exists()) {
- log.info("Parent[%s] didn't exist, creating.", parentDir);
- if (!parentDir.mkdirs()) {
- log.warn("Unable to make parentDir[%s]", parentDir);
- }
- }
-
- if (!legacyStorageDir.renameTo(localStorageDir)) {
- log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir);
- }
+ if (loc == null) {
+ Iterator locIter = locations.iterator();
+ loc = locIter.next();
+ while (locIter.hasNext()) {
+ loc = loc.mostEmpty(locIter.next());
}
- }
- if (localStorageDir.exists()) {
- long localLastModified = localStorageDir.lastModified();
- long remoteLastModified = dataSegmentPuller.getLastModified(segment);
- if (remoteLastModified > 0 && localLastModified >= remoteLastModified) {
- log.info(
- "Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.",
- localStorageDir, localLastModified, remoteLastModified
- );
- return localStorageDir;
- }
- }
-
- if (localStorageDir.exists()) {
- try {
- FileUtils.deleteDirectory(localStorageDir);
- }
- catch (IOException e) {
- log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir);
- }
- }
- if (!localStorageDir.mkdirs()) {
- log.info("Unable to make parent file[%s]", localStorageDir);
- }
-
- dataSegmentPuller.getSegmentFiles(segment, localStorageDir);
-
- return localStorageDir;
- }
-
- private File getLocalStorageDir(DataSegment segment)
- {
- String outputKey = JOINER.join(
- segment.getDataSource(),
- String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
- segment.getVersion(),
- segment.getShardSpec().getPartitionNum()
- );
-
- return new File(config.getCacheDirectory(), outputKey);
- }
-
- private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
- {
- log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile);
- if (!pulledFile.renameTo(cacheFile)) {
- log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile);
-
- try {
- StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile);
- }
- catch (IOException e) {
- throw new SegmentLoadingException(
- e,
- "Problem moving pulledFile[%s] to cache[%s]",
- pulledFile,
- cacheFile
+ if (!loc.canHandle(segment.getSize())) {
+ throw new ISE(
+ "Segment[%s:%,d] too large for storage[%s:%,d].",
+ segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
);
}
- if (!pulledFile.delete()) {
- log.error("Could not delete pulledFile[%s].", pulledFile);
+
+ File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
+ if (!storageDir.mkdirs()) {
+ log.debug("Unable to make parent file[%s]", storageDir);
}
+
+ dataSegmentPuller.getSegmentFiles(segment, storageDir);
+ loc.addSegment(segment);
+
+ retVal = storageDir;
}
+ else {
+ retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
+ }
+
+ loc.addSegment(segment);
+
+ return retVal;
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
- File cacheFile = getLocalStorageDir(segment);
+ StorageLocation loc = findStorageLocationIfLoaded(segment);
+
+ if (loc == null) {
+ log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
+ return;
+ }
try {
+ File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
+ loc.removeSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
+
+ private static class StorageLocation
+ {
+ private final File path;
+ private final long maxSize;
+ private final Set segments;
+
+ private volatile long currSize = 0;
+
+ StorageLocation(
+ File path,
+ long maxSize
+ )
+ {
+ this.path = path;
+ this.maxSize = maxSize;
+
+ this.segments = Sets.newHashSet();
+ }
+
+ private File getPath()
+ {
+ return path;
+ }
+
+ private Long getMaxSize()
+ {
+ return maxSize;
+ }
+
+ private synchronized void addSegment(DataSegment segment)
+ {
+ if (! segments.add(segment)) {
+ currSize += segment.getSize();
+ }
+ }
+
+ private synchronized void removeSegment(DataSegment segment)
+ {
+ if (segments.remove(segment)) {
+ currSize -= segment.getSize();
+ }
+ }
+
+ private boolean canHandle(long size)
+ {
+ return available() > size;
+ }
+
+ private synchronized long available()
+ {
+ return maxSize - currSize;
+ }
+
+ private StorageLocation mostEmpty(StorageLocation other)
+ {
+ return available() > other.available() ? this : other;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StorageLocation{" +
+ "path=" + path +
+ ", maxSize=" + maxSize +
+ '}';
+ }
+ }
}
diff --git a/services/pom.xml b/services/pom.xml
index 18abf630390..d01c1f44706 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
druid-services
druid-services
druid-services
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT
com.metamx
druid
- 0.5.5-SNAPSHOT
+ 0.5.6-SNAPSHOT