1) Adjust SingleSegmentLoader to allow for storing segments on multiple different mount points. The specification language is really janky right now, so this is remaining a stealth feature for the time being.

This commit is contained in:
cheddar 2013-07-01 14:56:57 -07:00
parent e8afeda046
commit 797a083b69
5 changed files with 173 additions and 127 deletions

View File

@ -24,6 +24,10 @@ import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory; import com.metamx.druid.loading.MMappedQueryableIndexFactory;
@ -31,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader; import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -141,9 +141,9 @@ public class TaskToolbox
new SegmentLoaderConfig() new SegmentLoaderConfig()
{ {
@Override @Override
public File getCacheDirectory() public String getCacheDirectory()
{ {
return new File(getTaskWorkDir(), "fetched_segments"); return new File(getTaskWorkDir(), "fetched_segments").toString();
} }
} }
); );

View File

@ -39,9 +39,13 @@ public interface DataSegmentPuller
/** /**
* Returns the last modified time of the given segment. * Returns the last modified time of the given segment.
* *
* Note, this is not actually used at this point and doesn't need to actually be implemented. It's just still here
* to not break compatibility.
*
* @param segment The segment to check the last modified time for * @param segment The segment to check the last modified time for
* @return the last modified time in millis from the epoch * @return the last modified time in millis from the epoch
* @throws SegmentLoadingException if there are any errors * @throws SegmentLoadingException if there are any errors
*/ */
@Deprecated
public long getLastModified(DataSegment segment) throws SegmentLoadingException; public long getLastModified(DataSegment segment) throws SegmentLoadingException;
} }

View File

@ -20,32 +20,14 @@
package com.metamx.druid.loading; package com.metamx.druid.loading;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.metamx.common.MapUtils;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import java.util.Map;
/** /**
*/ */
public class DataSegmentPusherUtil public class DataSegmentPusherUtil
{ {
private static final Joiner JOINER = Joiner.on("/").skipNulls(); private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static String getLegacyStorageDir(DataSegment segment)
{
final Map<String,Object> loadSpec = segment.getLoadSpec();
String specType = MapUtils.getString(loadSpec, "type");
if (specType.startsWith("s3")) {
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/")));
}
return null;
}
public static String getStorageDir(DataSegment segment) public static String getStorageDir(DataSegment segment)
{ {
return JOINER.join( return JOINER.join(

View File

@ -21,14 +21,18 @@ package com.metamx.druid.loading;
import org.skife.config.Config; import org.skife.config.Config;
import java.io.File;
/** /**
*/ */
public abstract class SegmentLoaderConfig public abstract class SegmentLoaderConfig
{ {
@Config({"druid.paths.indexCache", "druid.segmentCache.path"}) @Config({"druid.paths.indexCache", "druid.segmentCache.path"})
public abstract File getCacheDirectory(); public abstract String getCacheDirectory();
@Config("druid.server.maxSize")
public long getServerMaxSize()
{
return Long.MAX_VALUE;
}
@Config("druid.segmentCache.deleteOnRemove") @Config("druid.segmentCache.deleteOnRemove")
public boolean deleteOnRemove() public boolean deleteOnRemove()

View File

@ -19,9 +19,13 @@
package com.metamx.druid.loading; package com.metamx.druid.loading;
import com.google.common.base.Joiner; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.StreamUtils; import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndex;
@ -29,7 +33,11 @@ import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment; import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import java.io.*; import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/** /**
*/ */
@ -39,8 +47,8 @@ public class SingleSegmentLoader implements SegmentLoader
private final DataSegmentPuller dataSegmentPuller; private final DataSegmentPuller dataSegmentPuller;
private final QueryableIndexFactory factory; private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final List<StorageLocation> locations;
@Inject @Inject
public SingleSegmentLoader( public SingleSegmentLoader(
@ -51,22 +59,52 @@ public class SingleSegmentLoader implements SegmentLoader
{ {
this.dataSegmentPuller = dataSegmentPuller; this.dataSegmentPuller = dataSegmentPuller;
this.factory = factory; this.factory = factory;
this.config = config;
final ImmutableList.Builder<StorageLocation> locBuilder = ImmutableList.builder();
// This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone
// We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
// But, that'll have to wait for some other day.
for (String dirSpec : config.getCacheDirectory().split(",")) {
String[] dirSplit = dirSpec.split("\\|");
if (dirSplit.length == 1) {
locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize()));
}
else if (dirSplit.length == 2) {
final Long maxSize = Longs.tryParse(dirSplit[1]);
if (maxSize == null) {
throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]);
}
locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize));
}
else {
throw new ISE(
"Unknown segment storage location[%s]=>[%s], config[%s].",
dirSplit.length, dirSpec, config.getCacheDirectory()
);
}
}
locations = locBuilder.build();
Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory.");
log.info("Using storage locations[%s]", locations);
} }
@Override @Override
public boolean isSegmentLoaded(final DataSegment segment) public boolean isSegmentLoaded(final DataSegment segment)
{ {
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); return findStorageLocationIfLoaded(segment) != null;
if (localStorageDir.exists()) { }
return true;
}
final File legacyStorageDir = new File( public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
config.getCacheDirectory(), {
DataSegmentPusherUtil.getLegacyStorageDir(segment) for (StorageLocation location : locations) {
); File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
return legacyStorageDir.exists(); if (localStorageDir.exists()) {
return location;
}
}
return null;
} }
@Override @Override
@ -80,111 +118,129 @@ public class SingleSegmentLoader implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{ {
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); StorageLocation loc = findStorageLocationIfLoaded(segment);
final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment); final File retVal;
if (legacyDir != null) {
File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir);
if (legacyStorageDir.exists()) { if (loc == null) {
log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir); Iterator<StorageLocation> locIter = locations.iterator();
if (localStorageDir.exists()) { loc = locIter.next();
try { while (locIter.hasNext()) {
FileUtils.deleteDirectory(localStorageDir); loc = loc.mostEmpty(locIter.next());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir);
}
}
final File parentDir = localStorageDir.getParentFile();
if (!parentDir.exists()) {
log.info("Parent[%s] didn't exist, creating.", parentDir);
if (!parentDir.mkdirs()) {
log.warn("Unable to make parentDir[%s]", parentDir);
}
}
if (!legacyStorageDir.renameTo(localStorageDir)) {
log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir);
}
} }
}
if (localStorageDir.exists()) { if (!loc.canHandle(segment.getSize())) {
long localLastModified = localStorageDir.lastModified(); throw new ISE(
long remoteLastModified = dataSegmentPuller.getLastModified(segment); "Segment[%s:%,d] too large for storage[%s:%,d].",
if (remoteLastModified > 0 && localLastModified >= remoteLastModified) { segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
log.info(
"Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.",
localStorageDir, localLastModified, remoteLastModified
);
return localStorageDir;
}
}
if (localStorageDir.exists()) {
try {
FileUtils.deleteDirectory(localStorageDir);
}
catch (IOException e) {
log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir);
}
}
if (!localStorageDir.mkdirs()) {
log.info("Unable to make parent file[%s]", localStorageDir);
}
dataSegmentPuller.getSegmentFiles(segment, localStorageDir);
return localStorageDir;
}
private File getLocalStorageDir(DataSegment segment)
{
String outputKey = JOINER.join(
segment.getDataSource(),
String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
return new File(config.getCacheDirectory(), outputKey);
}
private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
{
log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile);
if (!pulledFile.renameTo(cacheFile)) {
log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile);
try {
StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile);
}
catch (IOException e) {
throw new SegmentLoadingException(
e,
"Problem moving pulledFile[%s] to cache[%s]",
pulledFile,
cacheFile
); );
} }
if (!pulledFile.delete()) {
log.error("Could not delete pulledFile[%s].", pulledFile); File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
} }
dataSegmentPuller.getSegmentFiles(segment, storageDir);
loc.addSegment(segment);
retVal = storageDir;
} }
else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
}
loc.addSegment(segment);
return retVal;
} }
@Override @Override
public void cleanup(DataSegment segment) throws SegmentLoadingException public void cleanup(DataSegment segment) throws SegmentLoadingException
{ {
File cacheFile = getLocalStorageDir(segment); StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
return;
}
try { try {
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
log.info("Deleting directory[%s]", cacheFile); log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile); FileUtils.deleteDirectory(cacheFile);
loc.removeSegment(segment);
} }
catch (IOException e) { catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());
} }
} }
private static class StorageLocation
{
private final File path;
private final long maxSize;
private final Set<DataSegment> segments;
private volatile long currSize = 0;
StorageLocation(
File path,
long maxSize
)
{
this.path = path;
this.maxSize = maxSize;
this.segments = Sets.newHashSet();
}
private File getPath()
{
return path;
}
private Long getMaxSize()
{
return maxSize;
}
private synchronized void addSegment(DataSegment segment)
{
if (! segments.add(segment)) {
currSize += segment.getSize();
}
}
private synchronized void removeSegment(DataSegment segment)
{
if (segments.remove(segment)) {
currSize -= segment.getSize();
}
}
private boolean canHandle(long size)
{
return available() > size;
}
private synchronized long available()
{
return maxSize - currSize;
}
private StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}
@Override
public String toString()
{
return "StorageLocation{" +
"path=" + path +
", maxSize=" + maxSize +
'}';
}
}
} }