diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java index 0ee2a6e7632..5bbfd73abbe 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java @@ -24,6 +24,10 @@ import com.google.common.collect.Maps; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.ServerView; import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.indexing.common.actions.TaskActionClient; +import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; +import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.MMappedQueryableIndexFactory; @@ -31,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SingleSegmentLoader; -import com.metamx.druid.indexing.common.actions.TaskActionClient; -import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; -import com.metamx.druid.indexing.common.config.TaskConfig; -import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -141,9 +141,9 @@ public class TaskToolbox new SegmentLoaderConfig() { @Override - public File getCacheDirectory() + public String getCacheDirectory() { - return new File(getTaskWorkDir(), "fetched_segments"); + return new File(getTaskWorkDir(), "fetched_segments").toString(); } } ); diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java index b821c653a6e..306d7f449af 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java @@ -39,9 +39,13 @@ public interface DataSegmentPuller /** * Returns the last modified time of the given segment. * + * Note, this is not actually used at this point and doesn't need to actually be implemented. It's just still here + * to not break compatibility. + * * @param segment The segment to check the last modified time for * @return the last modified time in millis from the epoch * @throws SegmentLoadingException if there are any errors */ + @Deprecated public long getLastModified(DataSegment segment) throws SegmentLoadingException; } diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java index bc44f82f3dd..e72bd787bb3 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java @@ -20,32 +20,14 @@ package com.metamx.druid.loading; import com.google.common.base.Joiner; -import com.metamx.common.MapUtils; import com.metamx.druid.client.DataSegment; -import java.util.Map; - /** */ public class DataSegmentPusherUtil { private static final Joiner JOINER = Joiner.on("/").skipNulls(); - public static String getLegacyStorageDir(DataSegment segment) - { - final Map loadSpec = segment.getLoadSpec(); - - String specType = MapUtils.getString(loadSpec, "type"); - if (specType.startsWith("s3")) { - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - - return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/"))); - } - - return null; - } - public static String getStorageDir(DataSegment segment) { return JOINER.join( diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java index 294c91b9a38..8a0e32484e1 100644 --- a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java @@ -21,14 +21,18 @@ package com.metamx.druid.loading; import org.skife.config.Config; -import java.io.File; - /** */ public abstract class SegmentLoaderConfig { @Config({"druid.paths.indexCache", "druid.segmentCache.path"}) - public abstract File getCacheDirectory(); + public abstract String getCacheDirectory(); + + @Config("druid.server.maxSize") + public long getServerMaxSize() + { + return Long.MAX_VALUE; + } @Config("druid.segmentCache.deleteOnRemove") public boolean deleteOnRemove() diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index 61e9986f484..f0e9a7f20e8 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -19,9 +19,13 @@ package com.metamx.druid.loading; -import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import com.google.inject.Inject; -import com.metamx.common.StreamUtils; +import com.metamx.common.IAE; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.QueryableIndex; @@ -29,7 +33,11 @@ import com.metamx.druid.index.QueryableIndexSegment; import com.metamx.druid.index.Segment; import org.apache.commons.io.FileUtils; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; /** */ @@ -39,8 +47,8 @@ public class SingleSegmentLoader implements SegmentLoader private final DataSegmentPuller dataSegmentPuller; private final QueryableIndexFactory factory; - private final SegmentLoaderConfig config; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + + private final List locations; @Inject public SingleSegmentLoader( @@ -51,22 +59,52 @@ public class SingleSegmentLoader implements SegmentLoader { this.dataSegmentPuller = dataSegmentPuller; this.factory = factory; - this.config = config; + + final ImmutableList.Builder locBuilder = ImmutableList.builder(); + + // This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone + // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that + // But, that'll have to wait for some other day. + for (String dirSpec : config.getCacheDirectory().split(",")) { + String[] dirSplit = dirSpec.split("\\|"); + if (dirSplit.length == 1) { + locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize())); + } + else if (dirSplit.length == 2) { + final Long maxSize = Longs.tryParse(dirSplit[1]); + if (maxSize == null) { + throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]); + } + locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize)); + } + else { + throw new ISE( + "Unknown segment storage location[%s]=>[%s], config[%s].", + dirSplit.length, dirSpec, config.getCacheDirectory() + ); + } + } + locations = locBuilder.build(); + + Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory."); + log.info("Using storage locations[%s]", locations); } @Override public boolean isSegmentLoaded(final DataSegment segment) { - File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); - if (localStorageDir.exists()) { - return true; - } + return findStorageLocationIfLoaded(segment) != null; + } - final File legacyStorageDir = new File( - config.getCacheDirectory(), - DataSegmentPusherUtil.getLegacyStorageDir(segment) - ); - return legacyStorageDir.exists(); + public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + { + for (StorageLocation location : locations) { + File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (localStorageDir.exists()) { + return location; + } + } + return null; } @Override @@ -80,111 +118,129 @@ public class SingleSegmentLoader implements SegmentLoader public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { - File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); + StorageLocation loc = findStorageLocationIfLoaded(segment); - final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment); - if (legacyDir != null) { - File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir); + final File retVal; - if (legacyStorageDir.exists()) { - log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir); - if (localStorageDir.exists()) { - try { - FileUtils.deleteDirectory(localStorageDir); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir); - } - } - final File parentDir = localStorageDir.getParentFile(); - if (!parentDir.exists()) { - log.info("Parent[%s] didn't exist, creating.", parentDir); - if (!parentDir.mkdirs()) { - log.warn("Unable to make parentDir[%s]", parentDir); - } - } - - if (!legacyStorageDir.renameTo(localStorageDir)) { - log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir); - } + if (loc == null) { + Iterator locIter = locations.iterator(); + loc = locIter.next(); + while (locIter.hasNext()) { + loc = loc.mostEmpty(locIter.next()); } - } - if (localStorageDir.exists()) { - long localLastModified = localStorageDir.lastModified(); - long remoteLastModified = dataSegmentPuller.getLastModified(segment); - if (remoteLastModified > 0 && localLastModified >= remoteLastModified) { - log.info( - "Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.", - localStorageDir, localLastModified, remoteLastModified - ); - return localStorageDir; - } - } - - if (localStorageDir.exists()) { - try { - FileUtils.deleteDirectory(localStorageDir); - } - catch (IOException e) { - log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir); - } - } - if (!localStorageDir.mkdirs()) { - log.info("Unable to make parent file[%s]", localStorageDir); - } - - dataSegmentPuller.getSegmentFiles(segment, localStorageDir); - - return localStorageDir; - } - - private File getLocalStorageDir(DataSegment segment) - { - String outputKey = JOINER.join( - segment.getDataSource(), - String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - - return new File(config.getCacheDirectory(), outputKey); - } - - private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException - { - log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile); - if (!pulledFile.renameTo(cacheFile)) { - log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile); - - try { - StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile); - } - catch (IOException e) { - throw new SegmentLoadingException( - e, - "Problem moving pulledFile[%s] to cache[%s]", - pulledFile, - cacheFile + if (!loc.canHandle(segment.getSize())) { + throw new ISE( + "Segment[%s:%,d] too large for storage[%s:%,d].", + segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available() ); } - if (!pulledFile.delete()) { - log.error("Could not delete pulledFile[%s].", pulledFile); + + File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (!storageDir.mkdirs()) { + log.debug("Unable to make parent file[%s]", storageDir); } + + dataSegmentPuller.getSegmentFiles(segment, storageDir); + loc.addSegment(segment); + + retVal = storageDir; } + else { + retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + } + + loc.addSegment(segment); + + return retVal; } @Override public void cleanup(DataSegment segment) throws SegmentLoadingException { - File cacheFile = getLocalStorageDir(segment); + StorageLocation loc = findStorageLocationIfLoaded(segment); + + if (loc == null) { + log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); + return; + } try { + File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); log.info("Deleting directory[%s]", cacheFile); FileUtils.deleteDirectory(cacheFile); + loc.removeSegment(segment); } catch (IOException e) { throw new SegmentLoadingException(e, e.getMessage()); } } + + private static class StorageLocation + { + private final File path; + private final long maxSize; + private final Set segments; + + private volatile long currSize = 0; + + StorageLocation( + File path, + long maxSize + ) + { + this.path = path; + this.maxSize = maxSize; + + this.segments = Sets.newHashSet(); + } + + private File getPath() + { + return path; + } + + private Long getMaxSize() + { + return maxSize; + } + + private synchronized void addSegment(DataSegment segment) + { + if (! segments.add(segment)) { + currSize += segment.getSize(); + } + } + + private synchronized void removeSegment(DataSegment segment) + { + if (segments.remove(segment)) { + currSize -= segment.getSize(); + } + } + + private boolean canHandle(long size) + { + return available() > size; + } + + private synchronized long available() + { + return maxSize - currSize; + } + + private StorageLocation mostEmpty(StorageLocation other) + { + return available() > other.available() ? this : other; + } + + @Override + public String toString() + { + return "StorageLocation{" + + "path=" + path + + ", maxSize=" + maxSize + + '}'; + } + } }