Working toward making it easier to add new SegmentPullers.

1) Move the local cacheFile logic out of the S3 pullers into the SingleSegmentLoader
2) Make the S3SegmentPuller just pull down the file
3) Make the Loader do the unzip, ungzip, or rename
4) 2 and 3 make S3ZippedSegmentPuller not necessary (still there, just deprecated and empty)
4) Tweak the TaskToolbox so that the Pullers returned by getSegmentGetters behave the same as they did before
This commit is contained in:
James Estes 2013-02-09 22:33:00 -07:00 committed by Eric Tschetter
parent 824e3c0eb2
commit d1626576c0
6 changed files with 219 additions and 282 deletions

View File

@ -21,13 +21,15 @@ package com.metamx.druid.merger.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.S3SegmentPuller;
import com.metamx.druid.loading.S3SegmentGetterConfig;
import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.loading.StorageAdapterLoadingException;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -88,19 +90,28 @@ public class TaskToolbox
public Map<String, SegmentPuller> getSegmentGetters(final Task task)
{
final S3SegmentGetterConfig getterConfig = new S3SegmentGetterConfig()
{
@Override
public File getCacheDirectory()
{
return new File(config.getTaskDir(task), "fetched_segments");
}
};
LoaderPullerAdapter puller = new LoaderPullerAdapter(new File(config.getTaskDir(task), "fetched_segments"));
return ImmutableMap.<String, SegmentPuller>builder()
.put("s3", new S3SegmentPuller(s3Client, getterConfig))
.put("s3_union", new S3SegmentPuller(s3Client, getterConfig))
.put("s3_zip", new S3ZippedSegmentPuller(s3Client, getterConfig))
.put("s3", puller)
.put("s3_union", puller)
.put("s3_zip", puller)
.build();
}
class LoaderPullerAdapter implements SegmentPuller{
private SingleSegmentLoader loader;
public LoaderPullerAdapter(File cacheDir){
loader = new SingleSegmentLoader(new S3SegmentPuller(s3Client), new MMappedQueryableIndexFactory(), cacheDir);
}
@Override
public File getSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException {
return loader.getSegmentFiles(loadSpec);
}
@Override
public long getLastModified(DataSegment segment) throws StorageAdapterLoadingException {
return -1;
}
}
}

View File

@ -35,7 +35,6 @@ import com.metamx.druid.query.group.GroupByQueryEngineConfig;
import com.metamx.druid.Query;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
@ -69,8 +68,8 @@ public class ServerInit
{
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
final S3SegmentPuller segmentGetter = new S3SegmentPuller(s3Client, config);
final S3ZippedSegmentPuller zippedGetter = new S3ZippedSegmentPuller(s3Client, config);
final S3SegmentPuller segmentGetter = new S3SegmentPuller(s3Client);
final QueryableIndexFactory factory;
if ("mmap".equals(config.getQueryableFactoryType())) {
factory = new MMappedQueryableIndexFactory();
@ -78,10 +77,11 @@ public class ServerInit
throw new ISE("Unknown queryableFactoryType[%s]", config.getQueryableFactoryType());
}
SingleSegmentLoader segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config.getCacheDirectory());
delegateLoader.setLoaderTypes(
ImmutableMap.<String, SegmentLoader>builder()
.put("s3", new SingleSegmentLoader(segmentGetter, factory))
.put("s3_zip", new SingleSegmentLoader(zippedGetter, factory))
.put("s3", segmentLoader)
.put("s3_zip", segmentLoader)
.build()
);

View File

@ -25,17 +25,15 @@ import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.joda.time.DateTime;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.zip.GZIPInputStream;
/**
*/
@ -48,133 +46,85 @@ public class S3SegmentPuller implements SegmentPuller
private static final String KEY = "key";
private final RestS3Service s3Client;
private final S3SegmentGetterConfig config;
@Inject
public S3SegmentPuller(
RestS3Service s3Client,
S3SegmentGetterConfig config
RestS3Service s3Client
)
{
this.s3Client = s3Client;
this.config = config;
}
@Override
public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
S3Coords s3Coords = new S3Coords(segment);
log.info("Loading index at path[s3://%s/%s]", s3Bucket, s3Path);
log.info("Loading index at path[%s]", s3Coords);
S3Object s3Obj = null;
File tmpFile = null;
try {
if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) {
throw new StorageAdapterLoadingException("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path);
}
File cacheFile = new File(config.getCacheDirectory(), computeCacheFilePath(s3Bucket, s3Path));
if (cacheFile.exists()) {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path);
DateTime cacheFileLastModified = new DateTime(cacheFile.lastModified());
DateTime s3ObjLastModified = new DateTime(objDetails.getLastModifiedDate().getTime());
if (cacheFileLastModified.isAfter(s3ObjLastModified)) {
log.info(
"Found cacheFile[%s] with modified[%s], which is after s3Obj[%s]. Using.",
cacheFile,
cacheFileLastModified,
s3ObjLastModified
);
return cacheFile.getParentFile();
}
FileUtils.deleteDirectory(cacheFile.getParentFile());
if(!isObjectInBucket(s3Coords)){
throw new StorageAdapterLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
long currTime = System.currentTimeMillis();
File tmpFile = null;
S3Object s3Obj = null;
tmpFile = File.createTempFile(s3Bucket, new DateTime().toString());
log.info(
"Downloading file[s3://%s/%s] to local tmpFile[%s] for cacheFile[%s]",
s3Bucket, s3Path, tmpFile, cacheFile
);
try {
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
tmpFile = File.createTempFile(s3Coords.bucket, new DateTime().toString() + s3Coords.path.replace('/', '_'));
log.info("Downloading file[%s] to local tmpFile[%s] for segment[%s]", s3Coords, tmpFile, segment);
s3Obj = s3Client.getObject(new S3Bucket(s3Bucket), s3Path);
StreamUtils.copyToFileAndClose(s3Obj.getDataInputStream(), tmpFile, DEFAULT_TIMEOUT);
final long downloadEndTime = System.currentTimeMillis();
log.info("Download of file[%s] completed in %,d millis", cacheFile, downloadEndTime - currTime);
log.info("Download of file[%s] completed in %,d millis", tmpFile, downloadEndTime - currTime);
if (!cacheFile.getParentFile().mkdirs()) {
log.info("Unable to make parent file[%s]", cacheFile.getParentFile());
}
cacheFile.delete();
if (s3Path.endsWith("gz")) {
log.info("Decompressing file[%s] to [%s]", tmpFile, cacheFile);
StreamUtils.copyToFileAndClose(
new GZIPInputStream(new FileInputStream(tmpFile)),
cacheFile
);
if (!tmpFile.delete()) {
log.error("Could not delete tmpFile[%s].", tmpFile);
}
} else {
log.info("Rename tmpFile[%s] to cacheFile[%s]", tmpFile, cacheFile);
if (!tmpFile.renameTo(cacheFile)) {
log.warn("Error renaming tmpFile[%s] to cacheFile[%s]. Copying instead.", tmpFile, cacheFile);
StreamUtils.copyToFileAndClose(new FileInputStream(tmpFile), cacheFile);
if (!tmpFile.delete()) {
log.error("Could not delete tmpFile[%s].", tmpFile);
}
}
}
long endTime = System.currentTimeMillis();
log.info("Local processing of file[%s] done in %,d millis", cacheFile, endTime - downloadEndTime);
return cacheFile.getParentFile();
return tmpFile;
}
catch (Exception e) {
if(tmpFile!=null && tmpFile.exists()){
tmpFile.delete();
}
throw new StorageAdapterLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
if (tmpFile != null && tmpFile.exists()) {
log.warn("Deleting tmpFile[%s] in finally block. Why?", tmpFile);
tmpFile.delete();
}
}
}
private String computeCacheFilePath(String s3Bucket, String s3Path)
{
return String.format(
"%s/%s", s3Bucket, s3Path.endsWith("gz") ? s3Path.substring(0, s3Path.length() - ".gz".length()) : s3Path
);
private boolean isObjectInBucket(S3Coords coords) throws StorageAdapterLoadingException {
try {
return s3Client.isObjectInBucket(coords.bucket, coords.path);
} catch (ServiceException e) {
throw new StorageAdapterLoadingException(e, "Problem communicating with S3 checking bucket/path[%s]", coords);
}
}
@Override
public boolean cleanSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
File cacheFile = new File(
config.getCacheDirectory(),
computeCacheFilePath(MapUtils.getString(loadSpec, BUCKET), MapUtils.getString(loadSpec, KEY))
);
public long getLastModified(DataSegment segment) throws StorageAdapterLoadingException {
S3Coords coords = new S3Coords(segment);
try {
final File parentFile = cacheFile.getParentFile();
log.info("Recursively deleting file[%s]", parentFile);
FileUtils.deleteDirectory(parentFile);
}
catch (IOException e) {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
return objDetails.getLastModifiedDate().getTime();
} catch (S3ServiceException e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
}
return true;
private class S3Coords {
String bucket;
String path;
public S3Coords(DataSegment segment) {
Map<String, Object> loadSpec = segment.getLoadSpec();
bucket = MapUtils.getString(loadSpec, BUCKET);
path = MapUtils.getString(loadSpec, KEY);
if(path.startsWith("/")){
path = path.substring(1);
}
}
public String toString(){
return String.format("s3://%s/%s", bucket, path);
}
}
}

View File

@ -19,169 +19,14 @@
package com.metamx.druid.loading;
import com.google.common.io.Closeables;
import com.metamx.common.MapUtils;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.joda.time.DateTime;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* @deprecated
*/
public class S3ZippedSegmentPuller implements SegmentPuller
public class S3ZippedSegmentPuller extends S3SegmentPuller
{
private static final Logger log = new Logger(S3ZippedSegmentPuller.class);
private static final String BUCKET = "bucket";
private static final String KEY = "key";
private final RestS3Service s3Client;
private final S3SegmentGetterConfig config;
public S3ZippedSegmentPuller(
RestS3Service s3Client,
S3SegmentGetterConfig config
)
{
this.s3Client = s3Client;
this.config = config;
}
@Override
public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
if (s3Path.startsWith("/")) {
s3Path = s3Path.substring(1);
}
log.info("Loading index at path[s3://%s/%s]", s3Bucket, s3Path);
S3Object s3Obj = null;
File tmpFile = null;
try {
if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) {
throw new StorageAdapterLoadingException("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path);
}
File cacheFile = new File(config.getCacheDirectory(), computeCacheFilePath(s3Bucket, s3Path));
if (cacheFile.exists()) {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path);
DateTime cacheFileLastModified = new DateTime(cacheFile.lastModified());
DateTime s3ObjLastModified = new DateTime(objDetails.getLastModifiedDate().getTime());
if (cacheFileLastModified.isAfter(s3ObjLastModified)) {
log.info(
"Found cacheFile[%s] with modified[%s], which is after s3Obj[%s]. Using.",
cacheFile,
cacheFileLastModified,
s3ObjLastModified
);
return cacheFile;
}
FileUtils.deleteDirectory(cacheFile);
}
long currTime = System.currentTimeMillis();
tmpFile = File.createTempFile(s3Bucket, new DateTime().toString());
log.info(
"Downloading file[s3://%s/%s] to local tmpFile[%s] for cacheFile[%s]",
s3Bucket, s3Path, tmpFile, cacheFile
);
s3Obj = s3Client.getObject(new S3Bucket(s3Bucket), s3Path);
StreamUtils.copyToFileAndClose(s3Obj.getDataInputStream(), tmpFile);
final long downloadEndTime = System.currentTimeMillis();
log.info("Download of file[%s] completed in %,d millis", cacheFile, downloadEndTime - currTime);
if (cacheFile.exists()) {
FileUtils.deleteDirectory(cacheFile);
}
cacheFile.mkdirs();
ZipInputStream zipIn = null;
OutputStream out = null;
ZipEntry entry = null;
try {
zipIn = new ZipInputStream(new BufferedInputStream(new FileInputStream(tmpFile)));
while ((entry = zipIn.getNextEntry()) != null) {
out = new FileOutputStream(new File(cacheFile, entry.getName()));
IOUtils.copy(zipIn, out);
zipIn.closeEntry();
Closeables.closeQuietly(out);
out = null;
}
}
finally {
Closeables.closeQuietly(out);
Closeables.closeQuietly(zipIn);
}
long endTime = System.currentTimeMillis();
log.info("Local processing of file[%s] done in %,d millis", cacheFile, endTime - downloadEndTime);
log.info("Deleting tmpFile[%s]", tmpFile);
tmpFile.delete();
return cacheFile;
}
catch (Exception e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
if (tmpFile != null && tmpFile.exists()) {
log.warn("Deleting tmpFile[%s] in finally block. Why?", tmpFile);
tmpFile.delete();
}
}
}
private String computeCacheFilePath(String s3Bucket, String s3Path)
{
return new File(String.format("%s/%s", s3Bucket, s3Path)).getParent();
}
@Override
public boolean cleanSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException
{
Map<String, Object> loadSpec = segment.getLoadSpec();
File cacheFile = new File(
config.getCacheDirectory(),
computeCacheFilePath(
MapUtils.getString(loadSpec, BUCKET),
MapUtils.getString(loadSpec, KEY)
)
);
try {
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
}
catch (IOException e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
return true;
public S3ZippedSegmentPuller(RestS3Service s3Client) {
super(s3Client);
}
}

View File

@ -29,5 +29,5 @@ import java.util.Map;
public interface SegmentPuller
{
public File getSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException;
public boolean cleanSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException;
long getLastModified(DataSegment segment) throws StorageAdapterLoadingException;
}

View File

@ -19,40 +19,171 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import java.io.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
*/
public class SingleSegmentLoader implements SegmentLoader
{
private static final Logger log = new Logger(SingleSegmentLoader.class);
private final SegmentPuller segmentPuller;
private final QueryableIndexFactory factory;
private File cacheDirectory;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
@Inject
public SingleSegmentLoader(
SegmentPuller segmentPuller,
QueryableIndexFactory factory
)
QueryableIndexFactory factory,
File cacheDirectory)
{
this.segmentPuller = segmentPuller;
this.factory = factory;
this.cacheDirectory = cacheDirectory;
}
@Override
public Segment getSegment(DataSegment segment) throws StorageAdapterLoadingException
{
final QueryableIndex index = factory.factorize(segmentPuller.getSegmentFiles(segment));
File segmentFiles = getSegmentFiles(segment);
final QueryableIndex index = factory.factorize(segmentFiles);
return new QueryableIndexSegment(segment.getIdentifier(), index);
}
public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException {
File cacheFile = getCacheFile(segment);
if (cacheFile.exists()) {
long localLastModified = cacheFile.lastModified();
long remoteLastModified = segmentPuller.getLastModified(segment);
if(remoteLastModified > 0 && localLastModified >= remoteLastModified){
log.info(
"Found cacheFile[%s] with modified[%s], which is same or after remote[%s]. Using.",
cacheFile,
localLastModified,
remoteLastModified
);
return cacheFile.getParentFile();
}
}
File pulledFile = segmentPuller.getSegmentFiles(segment);
if(!cacheFile.getParentFile().mkdirs()){
log.info("Unable to make parent file[%s]", cacheFile.getParentFile());
}
if (cacheFile.exists()) {
cacheFile.delete();
}
if(pulledFile.getName().endsWith(".zip")){
unzip(pulledFile, cacheFile.getParentFile());
} else if(pulledFile.getName().endsWith(".gz")){
gunzip(pulledFile, cacheFile);
} else {
moveToCache(pulledFile, cacheFile);
}
return cacheFile.getParentFile();
}
private File getCacheFile(DataSegment segment) {
String outputKey = JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
return new File(cacheDirectory, outputKey);
}
private void moveToCache(File pulledFile, File cacheFile) throws StorageAdapterLoadingException {
log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile);
if(!pulledFile.renameTo(cacheFile)){
log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile);
try {
StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile);
} catch (IOException e) {
throw new StorageAdapterLoadingException(e,"Problem moving pulledFile[%s] to cache[%s]", pulledFile, cacheFile);
}
if (!pulledFile.delete()) {
log.error("Could not delete pulledFile[%s].", pulledFile);
}
}
}
private void unzip(File pulledFile, File cacheFile) throws StorageAdapterLoadingException {
log.info("Unzipping file[%s] to [%s]", pulledFile, cacheFile);
ZipInputStream zipIn = null;
OutputStream out = null;
ZipEntry entry = null;
try {
zipIn = new ZipInputStream(new BufferedInputStream(new FileInputStream(pulledFile)));
while ((entry = zipIn.getNextEntry()) != null) {
out = new FileOutputStream(new File(cacheFile, entry.getName()));
IOUtils.copy(zipIn, out);
zipIn.closeEntry();
Closeables.closeQuietly(out);
out = null;
}
} catch(IOException e) {
throw new StorageAdapterLoadingException(e,"Problem unzipping[%s]", pulledFile);
}
finally {
Closeables.closeQuietly(out);
Closeables.closeQuietly(zipIn);
}
}
private void gunzip(File pulledFile, File cacheFile) throws StorageAdapterLoadingException {
log.info("Gunzipping file[%s] to [%s]", pulledFile, cacheFile);
try {
StreamUtils.copyToFileAndClose(
new GZIPInputStream(new FileInputStream(pulledFile)),
cacheFile
);
} catch (IOException e) {
throw new StorageAdapterLoadingException(e,"Problem gunzipping[%s]", pulledFile);
}
if (!pulledFile.delete()) {
log.error("Could not delete tmpFile[%s].", pulledFile);
}
}
@Override
public void cleanup(DataSegment segment) throws StorageAdapterLoadingException
{
segmentPuller.cleanSegmentFiles(segment);
File cacheFile = getCacheFile(segment).getParentFile();
try {
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
}
catch (IOException e) {
throw new StorageAdapterLoadingException(e, e.getMessage());
}
}
}