From d1626576c069cc9ee84c113b0bba04ed25c91207 Mon Sep 17 00:00:00 2001 From: James Estes Date: Sat, 9 Feb 2013 22:33:00 -0700 Subject: [PATCH] Working toward making it easier to add new SegmentPullers. 1) Move the local cacheFile logic out of the S3 pullers into the SingleSegmentLoader 2) Make the S3SegmentPuller just pull down the file 3) Make the Loader do the unzip, ungzip, or rename 4) 2 and 3 make S3ZippedSegmentPuller not necessary (still there, just deprecated and empty) 4) Tweak the TaskToolbox so that the Pullers returned by getSegmentGetters behave the same as they did before --- .../druid/merger/common/TaskToolbox.java | 39 +++-- .../druid/initialization/ServerInit.java | 12 +- .../metamx/druid/loading/S3SegmentPuller.java | 144 +++++----------- .../druid/loading/S3ZippedSegmentPuller.java | 163 +----------------- .../metamx/druid/loading/SegmentPuller.java | 2 +- .../druid/loading/SingleSegmentLoader.java | 141 ++++++++++++++- 6 files changed, 219 insertions(+), 282 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 0cebe1fc91c..d775921d560 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -21,13 +21,15 @@ package com.metamx.druid.merger.common; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.loading.MMappedQueryableIndexFactory; import com.metamx.druid.loading.S3SegmentPuller; -import com.metamx.druid.loading.S3SegmentGetterConfig; -import com.metamx.druid.loading.S3ZippedSegmentPuller; import com.metamx.druid.loading.SegmentPuller; +import com.metamx.druid.loading.SegmentPusher; +import com.metamx.druid.loading.SingleSegmentLoader; +import com.metamx.druid.loading.StorageAdapterLoadingException; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; -import com.metamx.druid.loading.SegmentPusher; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -88,19 +90,28 @@ public class TaskToolbox public Map getSegmentGetters(final Task task) { - final S3SegmentGetterConfig getterConfig = new S3SegmentGetterConfig() - { - @Override - public File getCacheDirectory() - { - return new File(config.getTaskDir(task), "fetched_segments"); - } - }; + LoaderPullerAdapter puller = new LoaderPullerAdapter(new File(config.getTaskDir(task), "fetched_segments")); return ImmutableMap.builder() - .put("s3", new S3SegmentPuller(s3Client, getterConfig)) - .put("s3_union", new S3SegmentPuller(s3Client, getterConfig)) - .put("s3_zip", new S3ZippedSegmentPuller(s3Client, getterConfig)) + .put("s3", puller) + .put("s3_union", puller) + .put("s3_zip", puller) .build(); } + + class LoaderPullerAdapter implements SegmentPuller{ + private SingleSegmentLoader loader; + public LoaderPullerAdapter(File cacheDir){ + loader = new SingleSegmentLoader(new S3SegmentPuller(s3Client), new MMappedQueryableIndexFactory(), cacheDir); + } + @Override + public File getSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException { + return loader.getSegmentFiles(loadSpec); + } + + @Override + public long getLastModified(DataSegment segment) throws StorageAdapterLoadingException { + return -1; + } + } } diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 1d727f9abe3..3a510e8b23c 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -35,7 +35,6 @@ import com.metamx.druid.query.group.GroupByQueryEngineConfig; import com.metamx.druid.Query; import com.metamx.druid.collect.StupidPool; import com.metamx.druid.loading.QueryableLoaderConfig; -import com.metamx.druid.loading.S3ZippedSegmentPuller; import com.metamx.druid.loading.SegmentLoader; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.group.GroupByQuery; @@ -69,8 +68,8 @@ public class ServerInit { DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader(); - final S3SegmentPuller segmentGetter = new S3SegmentPuller(s3Client, config); - final S3ZippedSegmentPuller zippedGetter = new S3ZippedSegmentPuller(s3Client, config); + final S3SegmentPuller segmentGetter = new S3SegmentPuller(s3Client); + final QueryableIndexFactory factory; if ("mmap".equals(config.getQueryableFactoryType())) { factory = new MMappedQueryableIndexFactory(); @@ -78,11 +77,12 @@ public class ServerInit throw new ISE("Unknown queryableFactoryType[%s]", config.getQueryableFactoryType()); } + SingleSegmentLoader segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config.getCacheDirectory()); delegateLoader.setLoaderTypes( ImmutableMap.builder() - .put("s3", new SingleSegmentLoader(segmentGetter, factory)) - .put("s3_zip", new SingleSegmentLoader(zippedGetter, factory)) - .build() + .put("s3", segmentLoader) + .put("s3_zip", segmentLoader) + .build() ); return delegateLoader; diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentPuller.java index 380489548d5..f85a489b1fe 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3SegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3SegmentPuller.java @@ -25,17 +25,15 @@ import com.metamx.common.StreamUtils; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.common.s3.S3Utils; -import org.apache.commons.io.FileUtils; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; import org.joda.time.DateTime; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; import java.util.Map; -import java.util.zip.GZIPInputStream; /** */ @@ -48,133 +46,85 @@ public class S3SegmentPuller implements SegmentPuller private static final String KEY = "key"; private final RestS3Service s3Client; - private final S3SegmentGetterConfig config; @Inject public S3SegmentPuller( - RestS3Service s3Client, - S3SegmentGetterConfig config + RestS3Service s3Client ) { this.s3Client = s3Client; - this.config = config; } @Override public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException { - Map loadSpec = segment.getLoadSpec(); - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); + S3Coords s3Coords = new S3Coords(segment); - log.info("Loading index at path[s3://%s/%s]", s3Bucket, s3Path); + log.info("Loading index at path[%s]", s3Coords); - S3Object s3Obj = null; + if(!isObjectInBucket(s3Coords)){ + throw new StorageAdapterLoadingException("IndexFile[%s] does not exist.", s3Coords); + } + + long currTime = System.currentTimeMillis(); File tmpFile = null; + S3Object s3Obj = null; + try { - if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) { - throw new StorageAdapterLoadingException("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path); - } + s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); + tmpFile = File.createTempFile(s3Coords.bucket, new DateTime().toString() + s3Coords.path.replace('/', '_')); + log.info("Downloading file[%s] to local tmpFile[%s] for segment[%s]", s3Coords, tmpFile, segment); - File cacheFile = new File(config.getCacheDirectory(), computeCacheFilePath(s3Bucket, s3Path)); - - if (cacheFile.exists()) { - S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path); - DateTime cacheFileLastModified = new DateTime(cacheFile.lastModified()); - DateTime s3ObjLastModified = new DateTime(objDetails.getLastModifiedDate().getTime()); - if (cacheFileLastModified.isAfter(s3ObjLastModified)) { - log.info( - "Found cacheFile[%s] with modified[%s], which is after s3Obj[%s]. Using.", - cacheFile, - cacheFileLastModified, - s3ObjLastModified - ); - return cacheFile.getParentFile(); - } - FileUtils.deleteDirectory(cacheFile.getParentFile()); - } - - long currTime = System.currentTimeMillis(); - - tmpFile = File.createTempFile(s3Bucket, new DateTime().toString()); - log.info( - "Downloading file[s3://%s/%s] to local tmpFile[%s] for cacheFile[%s]", - s3Bucket, s3Path, tmpFile, cacheFile - ); - - s3Obj = s3Client.getObject(new S3Bucket(s3Bucket), s3Path); StreamUtils.copyToFileAndClose(s3Obj.getDataInputStream(), tmpFile, DEFAULT_TIMEOUT); final long downloadEndTime = System.currentTimeMillis(); - log.info("Download of file[%s] completed in %,d millis", cacheFile, downloadEndTime - currTime); + log.info("Download of file[%s] completed in %,d millis", tmpFile, downloadEndTime - currTime); - if (!cacheFile.getParentFile().mkdirs()) { - log.info("Unable to make parent file[%s]", cacheFile.getParentFile()); - } - cacheFile.delete(); - - if (s3Path.endsWith("gz")) { - log.info("Decompressing file[%s] to [%s]", tmpFile, cacheFile); - StreamUtils.copyToFileAndClose( - new GZIPInputStream(new FileInputStream(tmpFile)), - cacheFile - ); - if (!tmpFile.delete()) { - log.error("Could not delete tmpFile[%s].", tmpFile); - } - } else { - log.info("Rename tmpFile[%s] to cacheFile[%s]", tmpFile, cacheFile); - if (!tmpFile.renameTo(cacheFile)) { - log.warn("Error renaming tmpFile[%s] to cacheFile[%s]. Copying instead.", tmpFile, cacheFile); - - StreamUtils.copyToFileAndClose(new FileInputStream(tmpFile), cacheFile); - if (!tmpFile.delete()) { - log.error("Could not delete tmpFile[%s].", tmpFile); - } - } - } - - long endTime = System.currentTimeMillis(); - log.info("Local processing of file[%s] done in %,d millis", cacheFile, endTime - downloadEndTime); - - return cacheFile.getParentFile(); + return tmpFile; } catch (Exception e) { + if(tmpFile!=null && tmpFile.exists()){ + tmpFile.delete(); + } throw new StorageAdapterLoadingException(e, e.getMessage()); } finally { S3Utils.closeStreamsQuietly(s3Obj); - if (tmpFile != null && tmpFile.exists()) { - log.warn("Deleting tmpFile[%s] in finally block. Why?", tmpFile); - tmpFile.delete(); - } } } - private String computeCacheFilePath(String s3Bucket, String s3Path) - { - return String.format( - "%s/%s", s3Bucket, s3Path.endsWith("gz") ? s3Path.substring(0, s3Path.length() - ".gz".length()) : s3Path - ); + private boolean isObjectInBucket(S3Coords coords) throws StorageAdapterLoadingException { + try { + return s3Client.isObjectInBucket(coords.bucket, coords.path); + } catch (ServiceException e) { + throw new StorageAdapterLoadingException(e, "Problem communicating with S3 checking bucket/path[%s]", coords); + } } @Override - public boolean cleanSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException - { - Map loadSpec = segment.getLoadSpec(); - File cacheFile = new File( - config.getCacheDirectory(), - computeCacheFilePath(MapUtils.getString(loadSpec, BUCKET), MapUtils.getString(loadSpec, KEY)) - ); - + public long getLastModified(DataSegment segment) throws StorageAdapterLoadingException { + S3Coords coords = new S3Coords(segment); try { - final File parentFile = cacheFile.getParentFile(); - log.info("Recursively deleting file[%s]", parentFile); - FileUtils.deleteDirectory(parentFile); - } - catch (IOException e) { + S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); + return objDetails.getLastModifiedDate().getTime(); + } catch (S3ServiceException e) { throw new StorageAdapterLoadingException(e, e.getMessage()); } + } - return true; + private class S3Coords { + String bucket; + String path; + + public S3Coords(DataSegment segment) { + Map loadSpec = segment.getLoadSpec(); + bucket = MapUtils.getString(loadSpec, BUCKET); + path = MapUtils.getString(loadSpec, KEY); + if(path.startsWith("/")){ + path = path.substring(1); + } + } + public String toString(){ + return String.format("s3://%s/%s", bucket, path); + } } } diff --git a/server/src/main/java/com/metamx/druid/loading/S3ZippedSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3ZippedSegmentPuller.java index 8fd8ebd4542..a3a7c724687 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3ZippedSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3ZippedSegmentPuller.java @@ -19,169 +19,14 @@ package com.metamx.druid.loading; -import com.google.common.io.Closeables; -import com.metamx.common.MapUtils; -import com.metamx.common.StreamUtils; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.common.s3.S3Utils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; -import org.jets3t.service.model.S3Object; -import org.joda.time.DateTime; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; /** + * @deprecated */ -public class S3ZippedSegmentPuller implements SegmentPuller +public class S3ZippedSegmentPuller extends S3SegmentPuller { - private static final Logger log = new Logger(S3ZippedSegmentPuller.class); - - private static final String BUCKET = "bucket"; - private static final String KEY = "key"; - - private final RestS3Service s3Client; - private final S3SegmentGetterConfig config; - - public S3ZippedSegmentPuller( - RestS3Service s3Client, - S3SegmentGetterConfig config - ) - { - this.s3Client = s3Client; - this.config = config; - } - - @Override - public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException - { - Map loadSpec = segment.getLoadSpec(); - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - - if (s3Path.startsWith("/")) { - s3Path = s3Path.substring(1); - } - - log.info("Loading index at path[s3://%s/%s]", s3Bucket, s3Path); - - S3Object s3Obj = null; - File tmpFile = null; - try { - if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) { - throw new StorageAdapterLoadingException("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path); - } - - File cacheFile = new File(config.getCacheDirectory(), computeCacheFilePath(s3Bucket, s3Path)); - - if (cacheFile.exists()) { - S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path); - DateTime cacheFileLastModified = new DateTime(cacheFile.lastModified()); - DateTime s3ObjLastModified = new DateTime(objDetails.getLastModifiedDate().getTime()); - if (cacheFileLastModified.isAfter(s3ObjLastModified)) { - log.info( - "Found cacheFile[%s] with modified[%s], which is after s3Obj[%s]. Using.", - cacheFile, - cacheFileLastModified, - s3ObjLastModified - ); - return cacheFile; - } - FileUtils.deleteDirectory(cacheFile); - } - - long currTime = System.currentTimeMillis(); - - tmpFile = File.createTempFile(s3Bucket, new DateTime().toString()); - log.info( - "Downloading file[s3://%s/%s] to local tmpFile[%s] for cacheFile[%s]", - s3Bucket, s3Path, tmpFile, cacheFile - ); - - s3Obj = s3Client.getObject(new S3Bucket(s3Bucket), s3Path); - StreamUtils.copyToFileAndClose(s3Obj.getDataInputStream(), tmpFile); - final long downloadEndTime = System.currentTimeMillis(); - log.info("Download of file[%s] completed in %,d millis", cacheFile, downloadEndTime - currTime); - - if (cacheFile.exists()) { - FileUtils.deleteDirectory(cacheFile); - } - cacheFile.mkdirs(); - - ZipInputStream zipIn = null; - OutputStream out = null; - ZipEntry entry = null; - try { - zipIn = new ZipInputStream(new BufferedInputStream(new FileInputStream(tmpFile))); - while ((entry = zipIn.getNextEntry()) != null) { - out = new FileOutputStream(new File(cacheFile, entry.getName())); - IOUtils.copy(zipIn, out); - zipIn.closeEntry(); - Closeables.closeQuietly(out); - out = null; - } - } - finally { - Closeables.closeQuietly(out); - Closeables.closeQuietly(zipIn); - } - - long endTime = System.currentTimeMillis(); - log.info("Local processing of file[%s] done in %,d millis", cacheFile, endTime - downloadEndTime); - - log.info("Deleting tmpFile[%s]", tmpFile); - tmpFile.delete(); - - return cacheFile; - } - catch (Exception e) { - throw new StorageAdapterLoadingException(e, e.getMessage()); - } - finally { - S3Utils.closeStreamsQuietly(s3Obj); - if (tmpFile != null && tmpFile.exists()) { - log.warn("Deleting tmpFile[%s] in finally block. Why?", tmpFile); - tmpFile.delete(); - } - } - } - - private String computeCacheFilePath(String s3Bucket, String s3Path) - { - return new File(String.format("%s/%s", s3Bucket, s3Path)).getParent(); - } - - @Override - public boolean cleanSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException - { - Map loadSpec = segment.getLoadSpec(); - File cacheFile = new File( - config.getCacheDirectory(), - computeCacheFilePath( - MapUtils.getString(loadSpec, BUCKET), - MapUtils.getString(loadSpec, KEY) - ) - ); - - try { - log.info("Deleting directory[%s]", cacheFile); - FileUtils.deleteDirectory(cacheFile); - } - catch (IOException e) { - throw new StorageAdapterLoadingException(e, e.getMessage()); - } - - return true; + public S3ZippedSegmentPuller(RestS3Service s3Client) { + super(s3Client); } } diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/SegmentPuller.java index 9cba65f425c..3e5f1b1a161 100644 --- a/server/src/main/java/com/metamx/druid/loading/SegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/SegmentPuller.java @@ -29,5 +29,5 @@ import java.util.Map; public interface SegmentPuller { public File getSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException; - public boolean cleanSegmentFiles(DataSegment loadSpec) throws StorageAdapterLoadingException; + long getLastModified(DataSegment segment) throws StorageAdapterLoadingException; } diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index 19c3981e988..ae62cfda1e9 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -19,40 +19,171 @@ package com.metamx.druid.loading; +import com.google.common.base.Joiner; +import com.google.common.io.Closeables; import com.google.inject.Inject; +import com.metamx.common.StreamUtils; +import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndexSegment; import com.metamx.druid.index.Segment; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; + +import java.io.*; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; /** */ public class SingleSegmentLoader implements SegmentLoader { + private static final Logger log = new Logger(SingleSegmentLoader.class); + private final SegmentPuller segmentPuller; private final QueryableIndexFactory factory; + private File cacheDirectory; + private static final Joiner JOINER = Joiner.on("/").skipNulls(); @Inject public SingleSegmentLoader( - SegmentPuller segmentPuller, - QueryableIndexFactory factory - ) + SegmentPuller segmentPuller, + QueryableIndexFactory factory, + File cacheDirectory) { this.segmentPuller = segmentPuller; this.factory = factory; + this.cacheDirectory = cacheDirectory; } @Override public Segment getSegment(DataSegment segment) throws StorageAdapterLoadingException { - final QueryableIndex index = factory.factorize(segmentPuller.getSegmentFiles(segment)); + File segmentFiles = getSegmentFiles(segment); + final QueryableIndex index = factory.factorize(segmentFiles); return new QueryableIndexSegment(segment.getIdentifier(), index); } + public File getSegmentFiles(DataSegment segment) throws StorageAdapterLoadingException { + File cacheFile = getCacheFile(segment); + if (cacheFile.exists()) { + long localLastModified = cacheFile.lastModified(); + long remoteLastModified = segmentPuller.getLastModified(segment); + if(remoteLastModified > 0 && localLastModified >= remoteLastModified){ + log.info( + "Found cacheFile[%s] with modified[%s], which is same or after remote[%s]. Using.", + cacheFile, + localLastModified, + remoteLastModified + ); + return cacheFile.getParentFile(); + } + } + + File pulledFile = segmentPuller.getSegmentFiles(segment); + + if(!cacheFile.getParentFile().mkdirs()){ + log.info("Unable to make parent file[%s]", cacheFile.getParentFile()); + } + if (cacheFile.exists()) { + cacheFile.delete(); + } + + if(pulledFile.getName().endsWith(".zip")){ + unzip(pulledFile, cacheFile.getParentFile()); + } else if(pulledFile.getName().endsWith(".gz")){ + gunzip(pulledFile, cacheFile); + } else { + moveToCache(pulledFile, cacheFile); + } + + return cacheFile.getParentFile(); + } + + private File getCacheFile(DataSegment segment) { + String outputKey = JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart(), + segment.getInterval().getEnd() + ), + segment.getVersion(), + segment.getShardSpec().getPartitionNum() + ); + + return new File(cacheDirectory, outputKey); + } + + private void moveToCache(File pulledFile, File cacheFile) throws StorageAdapterLoadingException { + log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile); + if(!pulledFile.renameTo(cacheFile)){ + log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile); + + try { + StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile); + } catch (IOException e) { + throw new StorageAdapterLoadingException(e,"Problem moving pulledFile[%s] to cache[%s]", pulledFile, cacheFile); + } + if (!pulledFile.delete()) { + log.error("Could not delete pulledFile[%s].", pulledFile); + } + } + } + + private void unzip(File pulledFile, File cacheFile) throws StorageAdapterLoadingException { + log.info("Unzipping file[%s] to [%s]", pulledFile, cacheFile); + ZipInputStream zipIn = null; + OutputStream out = null; + ZipEntry entry = null; + try { + zipIn = new ZipInputStream(new BufferedInputStream(new FileInputStream(pulledFile))); + while ((entry = zipIn.getNextEntry()) != null) { + out = new FileOutputStream(new File(cacheFile, entry.getName())); + IOUtils.copy(zipIn, out); + zipIn.closeEntry(); + Closeables.closeQuietly(out); + out = null; + } + } catch(IOException e) { + throw new StorageAdapterLoadingException(e,"Problem unzipping[%s]", pulledFile); + } + finally { + Closeables.closeQuietly(out); + Closeables.closeQuietly(zipIn); + } + } + + private void gunzip(File pulledFile, File cacheFile) throws StorageAdapterLoadingException { + log.info("Gunzipping file[%s] to [%s]", pulledFile, cacheFile); + try { + StreamUtils.copyToFileAndClose( + new GZIPInputStream(new FileInputStream(pulledFile)), + cacheFile + ); + } catch (IOException e) { + throw new StorageAdapterLoadingException(e,"Problem gunzipping[%s]", pulledFile); + } + if (!pulledFile.delete()) { + log.error("Could not delete tmpFile[%s].", pulledFile); + } + } + @Override public void cleanup(DataSegment segment) throws StorageAdapterLoadingException { - segmentPuller.cleanSegmentFiles(segment); + File cacheFile = getCacheFile(segment).getParentFile(); + + try { + log.info("Deleting directory[%s]", cacheFile); + FileUtils.deleteDirectory(cacheFile); + } + catch (IOException e) { + throw new StorageAdapterLoadingException(e, e.getMessage()); + } } + }