From 6d407e86772a39ad47a3b5abae9b0fabc1cc638c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 17 Feb 2015 16:37:28 -0800 Subject: [PATCH] Add URI handling to SegmentPullers * Requires https://github.com/druid-io/druid-api/pull/37 * Requires https://github.com/metamx/java-util/pull/22 * Moves the puller logic to use a more standard workflow going through java-util helpers instead of re-writing the handlers for each impl * General workflow goes like this: 1) LoadSpec makes sure the correct Puller is called with the correct parameters. 2) The Puller sets up general information like how to make an InputStream, how to find a file name (for .gz files for example), and when to retry. 3) CompressionUtils does most of the heavy lifting when it can --- .../introspect/GuiceInjectableValues.java | 12 +- .../cassandra/CassandraDataSegmentPuller.java | 128 ++++--- .../cassandra/CassandraDataSegmentPusher.java | 2 +- .../cassandra/CassandraDruidModule.java | 43 ++- .../storage/cassandra/CassandraLoadSpec.java | 54 +++ extensions/hdfs-storage/pom.xml | 36 +- .../storage/hdfs/HdfsDataSegmentPuller.java | 327 ++++++++++++++++-- .../storage/hdfs/HdfsDataSegmentPusher.java | 2 +- .../io/druid/storage/hdfs/HdfsLoadSpec.java | 61 ++++ .../storage/hdfs/HdfsStorageDruidModule.java | 31 +- .../loading/HdfsDataSegmentPullerTest.java | 220 ++++++++++++ .../druid/storage/s3/S3DataSegmentPuller.java | 270 ++++++++++++--- .../druid/storage/s3/S3DataSegmentPusher.java | 2 +- .../java/io/druid/storage/s3/S3LoadSpec.java | 90 +++++ .../storage/s3/S3StorageDruidModule.java | 33 +- .../java/io/druid/storage/s3/S3Utils.java | 43 ++- .../indexing/common/SegmentLoaderFactory.java | 6 +- .../indexing/common/TaskToolboxTest.java | 14 +- .../IngestSegmentFirehoseFactoryTest.java | 51 ++- .../indexing/overlord/TaskLifecycleTest.java | 12 +- .../worker/WorkerTaskMonitorTest.java | 13 +- pom.xml | 4 +- .../segment/SegmentMissingException.java | 4 + .../guice/LocalDataStorageDruidModule.java | 55 ++- .../loading/LocalDataSegmentPuller.java | 197 +++++++++-- .../loading/LocalDataSegmentPusher.java | 2 +- .../druid/segment/loading/LocalLoadSpec.java | 64 ++++ ...va => SegmentLoaderLocalCacheManager.java} | 50 ++- .../druid/segment/loading/LoadSpecTest.java | 108 ++++++ .../loading/LocalDataSegmentPullerTest.java | 152 ++++++++ 30 files changed, 1805 insertions(+), 281 deletions(-) create mode 100644 extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java create mode 100644 extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java create mode 100644 extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java create mode 100644 extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java create mode 100644 server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java rename server/src/main/java/io/druid/segment/loading/{OmniSegmentLoader.java => SegmentLoaderLocalCacheManager.java} (79%) create mode 100644 server/src/test/java/io/druid/segment/loading/LoadSpecTest.java create mode 100644 server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java diff --git a/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java b/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java index d1480924c9e..db300741222 100644 --- a/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java +++ b/common/src/main/java/com/fasterxml/jackson/databind/introspect/GuiceInjectableValues.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.InjectableValues; import com.google.inject.Injector; import com.google.inject.Key; +import com.metamx.common.IAE; + +import java.lang.reflect.Type; /** */ @@ -36,6 +39,13 @@ public class GuiceInjectableValues extends InjectableValues Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance ) { - return injector.getInstance((Key) valueId); + // From the docs: "Object that identifies value to inject; may be a simple name or more complex identifier object, + // whatever provider needs" + // Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with + // great care + if(valueId instanceof Key){ + return injector.getInstance((Key) valueId); + } + throw new IAE("Unknown class type [%s] for valueId [%s]", valueId.getClass().getCanonicalName(), valueId.toString()); } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java index b643ba4f590..3b33cea6b10 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java @@ -17,79 +17,105 @@ package io.druid.storage.cassandra; -import com.google.common.io.Files; +import com.google.common.base.Predicates; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.ISE; +import com.metamx.common.RetryUtils; import com.metamx.common.logger.Logger; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ObjectMetadata; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.apache.commons.io.FileUtils; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.Callable; /** * Cassandra Segment Puller - * - * @author boneill42 */ public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller { - private static final Logger log = new Logger(CassandraDataSegmentPuller.class); - private static final int CONCURRENCY = 10; - private static final int BATCH_SIZE = 10; + private static final Logger log = new Logger(CassandraDataSegmentPuller.class); + private static final int CONCURRENCY = 10; + private static final int BATCH_SIZE = 10; @Inject - public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) - { - super(config); - } + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + { + super(config); + } - @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException - { - String key = (String) segment.getLoadSpec().get("key"); - log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); + @Override + public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + getSegmentFiles(key, outDir); + } + public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(final String key, final File outDir) throws SegmentLoadingException{ + log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); + if (!outDir.exists()) { + outDir.mkdirs(); + } - if (!outDir.exists()) - { - outDir.mkdirs(); - } + if (!outDir.isDirectory()) { + throw new ISE("outDir[%s] must be a directory.", outDir); + } - if (!outDir.isDirectory()) - { - throw new ISE("outDir[%s] must be a directory.", outDir); - } + long startTime = System.currentTimeMillis(); + final File tmpFile = new File(outDir, "index.zip"); + log.info("Pulling to temporary local cache [%s]", tmpFile.getAbsolutePath()); - long startTime = System.currentTimeMillis(); - ObjectMetadata meta = null; - final File outFile = new File(outDir, "index.zip"); - try - { - try - { - log.info("Writing to [%s]", outFile.getAbsolutePath()); - OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); - meta = ChunkedStorage - .newReader(indexStorage, key, os) - .withBatchSize(BATCH_SIZE) - .withConcurrencyLevel(CONCURRENCY) - .call(); - os.close(); - CompressionUtils.unzip(outFile, outDir); - } catch (Exception e) - { - FileUtils.deleteDirectory(outDir); - } - } catch (Exception e) - { - throw new SegmentLoadingException(e, e.getMessage()); - } - log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, - meta.getObjectSize()); - } + final com.metamx.common.FileUtils.FileCopyResult localResult; + try { + localResult = RetryUtils.retry( + new Callable() + { + @Override + public com.metamx.common.FileUtils.FileCopyResult call() throws Exception + { + try (OutputStream os = new FileOutputStream(tmpFile)) { + final ObjectMetadata meta = ChunkedStorage + .newReader(indexStorage, key, os) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); + } + return new com.metamx.common.FileUtils.FileCopyResult(tmpFile); + } + }, + Predicates.alwaysTrue(), + 10 + ); + }catch (Exception e){ + throw new SegmentLoadingException(e, "Unable to copy key [%s] to file [%s]", key, tmpFile.getAbsolutePath()); + } + try{ + final com.metamx.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(tmpFile, outDir); + log.info( + "Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, + result.size() + ); + return result; + } + catch (Exception e) { + try { + FileUtils.deleteDirectory(outDir); + } + catch (IOException e1) { + log.error(e1, "Error clearing segment directory [%s]", outDir.getAbsolutePath()); + e.addSuppressed(e1); + } + throw new SegmentLoadingException(e, e.getMessage()); + } finally { + if(!tmpFile.delete()){ + log.warn("Could not delete cache file at [%s]", tmpFile.getAbsolutePath()); + } + } + } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index a6074985035..06a7dbedffe 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.recipes.storage.ChunkedStorage; @@ -28,7 +29,6 @@ import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import java.io.File; import java.io.FileInputStream; diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java index 0630b645dc2..3d3852cf55d 100644 --- a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDruidModule.java @@ -17,7 +17,7 @@ package io.druid.storage.cassandra; -import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.core.Version; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; @@ -34,24 +34,47 @@ import java.util.List; */ public class CassandraDruidModule implements DruidModule { - @Override - public List getJacksonModules() - { - return ImmutableList.of(); - } + public static final String SCHEME = "c*"; @Override public void configure(Binder binder) { Binders.dataSegmentPullerBinder(binder) - .addBinding("c*") - .to(CassandraDataSegmentPuller.class) - .in(LazySingleton.class); + .addBinding(SCHEME) + .to(CassandraDataSegmentPuller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) - .addBinding("c*") + .addBinding(SCHEME) .to(CassandraDataSegmentPusher.class) .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", CassandraDataSegmentConfig.class); } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new com.fasterxml.jackson.databind.Module() + { + @Override + public String getModuleName() + { + return "DruidCassandraStorage-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(CassandraLoadSpec.class); + } + } + ); + } } diff --git a/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java new file mode 100644 index 00000000000..5159dae172a --- /dev/null +++ b/extensions/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cassandra; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; + +import java.io.File; + +/** + * + */ +@JsonTypeName(CassandraDruidModule.SCHEME) +public class CassandraLoadSpec implements LoadSpec +{ + @JsonProperty + private final String key; + private final CassandraDataSegmentPuller puller; + + @JsonCreator + public CassandraLoadSpec( + @JacksonInject CassandraDataSegmentPuller puller, + @JsonProperty("key") String key + ) + { + this.puller = puller; + this.key = key; + } + + @Override + public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(key, outDir).size()); + } +} diff --git a/extensions/hdfs-storage/pom.xml b/extensions/hdfs-storage/pom.xml index fde319f0196..0d487e6e169 100644 --- a/extensions/hdfs-storage/pom.xml +++ b/extensions/hdfs-storage/pom.xml @@ -63,13 +63,39 @@ commons-io commons-io - + - junit - junit - test + junit + junit + test - + + io.druid + druid-server + ${parent.version} + test + + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + tests + test + + + org.apache.hadoop + hadoop-common + 2.3.0 + tests + test + + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + test + + diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java index 2927cbdafe5..a03a3372f47 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -17,23 +17,144 @@ package io.druid.storage.hdfs; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.IAE; +import com.metamx.common.RetryUtils; +import com.metamx.common.UOE; +import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import javax.tools.FileObject; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.Callable; /** */ -public class HdfsDataSegmentPuller implements DataSegmentPuller +public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller { + /** + * FileObject.getLastModified and FileObject.delete don't throw IOException. This allows us to wrap those calls + */ + public static class HdfsIOException extends RuntimeException + { + private final IOException cause; + + public HdfsIOException(IOException ex) + { + super(ex); + this.cause = ex; + } + + protected IOException getIOException() + { + return cause; + } + } + + + public static FileObject buildFileObject(final URI uri, final Configuration config) + { + return buildFileObject(uri, config, false); + } + + public static FileObject buildFileObject(final URI uri, final Configuration config, final Boolean overwrite) + { + return new FileObject() + { + final Path path = new Path(uri); + + @Override + public URI toUri() + { + return uri; + } + + @Override + public String getName() + { + return path.getName(); + } + + @Override + public InputStream openInputStream() throws IOException + { + final FileSystem fs = path.getFileSystem(config); + return fs.open(path); + } + + @Override + public OutputStream openOutputStream() throws IOException + { + final FileSystem fs = path.getFileSystem(config); + return fs.create(path, overwrite); + } + + @Override + public Reader openReader(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("HDFS Reader not supported"); + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("HDFS CharSequence not supported"); + } + + @Override + public Writer openWriter() throws IOException + { + throw new UOE("HDFS Writer not supported"); + } + + @Override + public long getLastModified() + { + try { + final FileSystem fs = path.getFileSystem(config); + return fs.getFileStatus(path).getModificationTime(); + } + catch (IOException ex) { + throw new HdfsIOException(ex); + } + } + + @Override + public boolean delete() + { + try { + final FileSystem fs = path.getFileSystem(config); + return fs.delete(path, false); + } + catch (IOException ex) { + throw new HdfsIOException(ex); + } + } + }; + } + + private static final Logger log = new Logger(HdfsDataSegmentPuller.class); private final Configuration config; @Inject @@ -42,46 +163,190 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller this.config = config; } + @Override public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException { - final Path path = getPath(segment); + getSegmentFiles(getPath(segment), dir); + } - final FileSystem fs = checkPathAndGetFilesystem(path); + public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException + { + final LocalFileSystem localFileSystem = new LocalFileSystem(); + try { + final FileSystem fs = path.getFileSystem(config); + if (fs.isDirectory(path)) { - if (path.getName().endsWith(".zip")) { - try { - try (FSDataInputStream in = fs.open(path)) { - CompressionUtils.unzip(in, dir); + // -------- directory --------- + + try { + return RetryUtils.retry( + new Callable() + { + @Override + public FileUtils.FileCopyResult call() throws Exception + { + if (!fs.exists(path)) { + throw new SegmentLoadingException("No files found at [%s]", path.toString()); + } + + final RemoteIterator children = fs.listFiles(path, false); + final ArrayList localChildren = new ArrayList<>(); + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); + while (children.hasNext()) { + final LocatedFileStatus child = children.next(); + final Path childPath = child.getPath(); + final String fname = childPath.getName(); + if (fs.isDirectory(childPath)) { + log.warn("[%s] is a child directory, skipping", childPath.toString()); + } else { + final File outFile = new File(outDir, fname); + + // Actual copy + fs.copyToLocalFile(childPath, new Path(outFile.toURI())); + result.addFile(outFile); + } + } + log.info( + "Copied %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outDir.getAbsolutePath() + ); + return result; + } + + }, + shouldRetryPredicate(), + 10 + ); } + catch (Exception e) { + throw Throwables.propagate(e); + } + } else if (CompressionUtils.isZip(path.getName())) { + + // -------- zip --------- + + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return getInputStream(path); + } + }, outDir, shouldRetryPredicate(), false + ); + + log.info( + "Unzipped %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outDir.getAbsolutePath() + ); + + return result; + } else if (CompressionUtils.isGz(path.getName())) { + + // -------- gzip --------- + + final String fname = path.getName(); + final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname)); + final FileUtils.FileCopyResult result = CompressionUtils.gunzip( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return getInputStream(path); + } + }, + outFile + ); + + log.info( + "Gunzipped %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outFile.getAbsolutePath() + ); + return result; + } else { + throw new SegmentLoadingException("Do not know how to handle file type at [%s]", path.toString()); } - catch (IOException e) { - throw new SegmentLoadingException(e, "Some IOException"); - } - } else { - throw new SegmentLoadingException("Unknown file type[%s]", path); } + catch (IOException e) { + throw new SegmentLoadingException(e, "Error loading [%s]", path.toString()); + } + } + + public FileUtils.FileCopyResult getSegmentFiles(URI uri, File outDir) throws SegmentLoadingException + { + if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { + throw new SegmentLoadingException("Don't know how to load SCHEME for URI [%s]", uri.toString()); + } + return getSegmentFiles(new Path(uri), outDir); + } + + public InputStream getInputStream(Path path) throws IOException + { + return buildFileObject(path.toUri(), config).openInputStream(); + } + + @Override + public InputStream getInputStream(URI uri) throws IOException + { + if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { + throw new IAE("Don't know how to load SCHEME [%s] for URI [%s]", uri.getScheme(), uri.toString()); + } + return buildFileObject(uri, config).openInputStream(); + } + + /** + * Return the "version" (aka last modified timestamp) of the URI + * + * @param uri The URI of interest + * + * @return The last modified timestamp of the uri in String format + * + * @throws IOException + */ + @Override + public String getVersion(URI uri) throws IOException + { + try { + return String.format("%d", buildFileObject(uri, config).getLastModified()); + } + catch (HdfsIOException ex) { + throw ex.getIOException(); + } + } + + @Override + public Predicate shouldRetryPredicate() + { + return new Predicate() + { + @Override + public boolean apply(Throwable input) + { + if (input == null) { + return false; + } + if (input instanceof HdfsIOException) { + return true; + } + if (input instanceof IOException) { + return true; + } + return apply(input.getCause()); + } + }; } private Path getPath(DataSegment segment) { return new Path(String.valueOf(segment.getLoadSpec().get("path"))); } - - private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException - { - FileSystem fs; - try { - fs = path.getFileSystem(config); - - if (!fs.exists(path)) { - throw new SegmentLoadingException("Path[%s] doesn't exist.", path); - } - - return fs; - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path); - } - } } diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index b1168ccfe70..6180b65f83f 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -22,12 +22,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java new file mode 100644 index 00000000000..9c86846314e --- /dev/null +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java @@ -0,0 +1,61 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.hdfs; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.metamx.common.ISE; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.util.Map; + +/** + * + */ +@JsonTypeName(HdfsStorageDruidModule.SCHEME) +public class HdfsLoadSpec implements LoadSpec +{ + private final Path path; + final HdfsDataSegmentPuller puller; + @JsonCreator + public HdfsLoadSpec( + @JacksonInject HdfsDataSegmentPuller puller, + @JsonProperty(value = "path", required = true) String path + ){ + Preconditions.checkNotNull(path); + this.path = new Path(path); + this.puller = puller; + } + @JsonProperty("path") + public final String getPathString(){ + return path.toString(); + } + + @Override + public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(path, outDir).size()); + } +} diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java index fca37d71358..6707cdf97f5 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -17,6 +17,7 @@ package io.druid.storage.hdfs; +import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -36,6 +37,7 @@ import java.util.Properties; */ public class HdfsStorageDruidModule implements DruidModule { + public static final String SCHEME = "hdfs"; private Properties props = null; @Inject @@ -47,15 +49,36 @@ public class HdfsStorageDruidModule implements DruidModule @Override public List getJacksonModules() { - return ImmutableList.of(); + return ImmutableList.of( + new Module() + { + @Override + public String getModuleName() + { + return "DruidHDFSStorage-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(HdfsLoadSpec.class); + } + } + ); } @Override public void configure(Binder binder) { - Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); - Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class); + Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class); final Configuration conf = new Configuration(); if (props != null) { diff --git a/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java new file mode 100644 index 00000000000..400270efa2a --- /dev/null +++ b/extensions/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java @@ -0,0 +1,220 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.google.common.io.ByteStreams; +import com.metamx.common.CompressionUtils; +import com.metamx.common.StringUtils; +import io.druid.storage.hdfs.HdfsDataSegmentPuller; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.Files; +import java.util.zip.GZIPOutputStream; + +/** + * + */ +public class HdfsDataSegmentPullerTest +{ + private static MiniDFSCluster miniCluster; + private static File hdfsTmpDir; + private static URI uriBase; + private static Path filePath = new Path("/tmp/foo"); + private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum"; + private static byte[] pathByteContents = StringUtils.toUtf8(pathContents); + private static Configuration conf; + + @BeforeClass + public static void setupStatic() throws IOException, ClassNotFoundException + { + hdfsTmpDir = File.createTempFile("hdfsHandlerTest", "dir"); + hdfsTmpDir.deleteOnExit(); + if (!hdfsTmpDir.delete()) { + throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath())); + } + conf = new Configuration(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); + miniCluster = new MiniDFSCluster.Builder(conf).build(); + uriBase = miniCluster.getURI(0); + + final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data"); + tmpFile.delete(); + try { + tmpFile.deleteOnExit(); + Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath()); + try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) { + Files.copy(tmpFile.toPath(), stream); + } + } + finally { + tmpFile.delete(); + } + } + + @AfterClass + public static void tearDownStatic() throws IOException + { + if (miniCluster != null) { + miniCluster.shutdown(true); + } + } + + + private HdfsDataSegmentPuller puller; + + @Before + public void setUp() + { + puller = new HdfsDataSegmentPuller(conf); + } + + @Test + public void testZip() throws IOException, SegmentLoadingException + { + final File tmpDir = com.google.common.io.Files.createTempDir(); + tmpDir.deleteOnExit(); + final File tmpFile = File.createTempFile("zipContents", ".txt", tmpDir); + tmpFile.deleteOnExit(); + + final Path zipPath = new Path("/tmp/testZip.zip"); + + final File outTmpDir = com.google.common.io.Files.createTempDir(); + outTmpDir.deleteOnExit(); + + final URI uri = URI.create(uriBase.toString() + zipPath.toString()); + + tmpFile.deleteOnExit(); + try (final OutputStream stream = new FileOutputStream(tmpFile)) { + ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream); + } + Assert.assertTrue(tmpFile.exists()); + + final File outFile = new File(outTmpDir, tmpFile.getName()); + outFile.delete(); + + try (final OutputStream stream = miniCluster.getFileSystem().create(zipPath)) { + CompressionUtils.zip(tmpDir, stream); + } + try { + Assert.assertFalse(outFile.exists()); + puller.getSegmentFiles(uri, outTmpDir); + Assert.assertTrue(outFile.exists()); + + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); + } + finally { + if (tmpFile.exists()) { + tmpFile.delete(); + } + if (outFile.exists()) { + outFile.delete(); + } + if (outTmpDir.exists()) { + outTmpDir.delete(); + } + if (tmpDir.exists()) { + tmpDir.delete(); + } + } + } + + @Test + public void testGZ() throws IOException, SegmentLoadingException + { + final Path zipPath = new Path("/tmp/testZip.gz"); + + final File outTmpDir = com.google.common.io.Files.createTempDir(); + outTmpDir.deleteOnExit(); + final File outFile = new File(outTmpDir, "testZip"); + outFile.delete(); + + final URI uri = URI.create(uriBase.toString() + zipPath.toString()); + + try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) { + try (final OutputStream gzStream = new GZIPOutputStream(outputStream)) { + try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { + ByteStreams.copy(inputStream, gzStream); + } + } + } + try { + Assert.assertFalse(outFile.exists()); + puller.getSegmentFiles(uri, outTmpDir); + Assert.assertTrue(outFile.exists()); + + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); + } + finally { + if (outFile.exists()) { + outFile.delete(); + } + if (outTmpDir.exists()) { + outTmpDir.delete(); + } + } + } + + @Test + public void testDir() throws IOException, SegmentLoadingException + { + + final Path zipPath = new Path("/tmp/tmp2/test.txt"); + + final File outTmpDir = com.google.common.io.Files.createTempDir(); + outTmpDir.deleteOnExit(); + final File outFile = new File(outTmpDir, "test.txt"); + outFile.delete(); + + final URI uri = URI.create(uriBase.toString() + "/tmp/tmp2"); + + try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) { + try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) { + ByteStreams.copy(inputStream, outputStream); + } + } + try { + Assert.assertFalse(outFile.exists()); + puller.getSegmentFiles(uri, outTmpDir); + Assert.assertTrue(outFile.exists()); + + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); + } + finally { + if (outFile.exists()) { + outFile.delete(); + } + if (outTmpDir.exists()) { + outTmpDir.delete(); + } + } + } +} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index e36bdb9e4ab..01cf5742c8a 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -17,37 +17,125 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3URI; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.io.ByteStreams; +import com.google.common.io.ByteSource; import com.google.common.io.Files; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.MapUtils; +import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; -import org.apache.commons.io.FileUtils; import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; +import javax.tools.FileObject; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.nio.file.Paths; import java.util.Map; import java.util.concurrent.Callable; -import java.util.zip.GZIPInputStream; /** + * A data segment puller that also hanldes URI data pulls. */ -public class S3DataSegmentPuller implements DataSegmentPuller +public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller { + public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws S3ServiceException + { + final URI checkedUri = checkURI(uri); + final AmazonS3URI s3URI = new AmazonS3URI(checkedUri); + final String key = s3URI.getKey(); + final String bucket = s3URI.getBucket(); + final S3Object s3Obj = s3Client.getObject(bucket, key); + final String path = uri.getPath(); + + return new FileObject() + { + @Override + public URI toUri() + { + return uri; + } + + @Override + public String getName() + { + final String ext = Files.getFileExtension(path); + return Files.getNameWithoutExtension(path) + (Strings.isNullOrEmpty(ext) ? "" : ("." + ext)); + } + + @Override + public InputStream openInputStream() throws IOException + { + try { + return s3Obj.getDataInputStream(); + } + catch (ServiceException e) { + throw new IOException(String.format("Could not load S3 URI [%s]", checkedUri.toString()), e); + } + } + + @Override + public OutputStream openOutputStream() throws IOException + { + throw new UOE("Cannot stream S3 output"); + } + + @Override + public Reader openReader(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("Cannot open reader"); + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("Cannot open character sequence"); + } + + @Override + public Writer openWriter() throws IOException + { + throw new UOE("Cannot open writer"); + } + + @Override + public long getLastModified() + { + return s3Obj.getLastModifiedDate().getTime(); + } + + @Override + public boolean delete() + { + throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily."); + } + }; + } + + public static final String scheme = S3StorageDruidModule.SCHEME; + private static final Logger log = new Logger(S3DataSegmentPuller.class); - private static final String BUCKET = "bucket"; - private static final String KEY = "key"; + protected static final String BUCKET = "bucket"; + protected static final String KEY = "key"; private final RestS3Service s3Client; @@ -62,7 +150,12 @@ public class S3DataSegmentPuller implements DataSegmentPuller @Override public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException { - final S3Coords s3Coords = new S3Coords(segment); + getSegmentFiles(new S3Coords(segment), outDir); + } + + public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) + throws SegmentLoadingException + { log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -79,63 +172,134 @@ public class S3DataSegmentPuller implements DataSegmentPuller } try { - S3Utils.retryS3Operation( - new Callable() - { - @Override - public Void call() throws Exception - { - long startTime = System.currentTimeMillis(); - S3Object s3Obj = null; - - try { - s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path); - - try (InputStream in = s3Obj.getDataInputStream()) { - final String key = s3Obj.getKey(); - if (key.endsWith(".zip")) { - CompressionUtils.unzip(in, outDir); - } else if (key.endsWith(".gz")) { - final File outFile = new File(outDir, toFilename(key, ".gz")); - ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); - } else { - ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); - } - log.info( - "Pull of file[%s/%s] completed in %,d millis", - s3Obj.getBucketName(), - s3Obj.getKey(), - System.currentTimeMillis() - startTime - ); - return null; - } - catch (IOException e) { - throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e); - } - } - finally { - S3Utils.closeStreamsQuietly(s3Obj); + final URI uri = URI.create(String.format("s3://%s/%s", s3Coords.bucket, s3Coords.path)); + final ByteSource byteSource = new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + try { + return buildFileObject(uri, s3Client).openInputStream(); + } + catch (ServiceException e) { + if (e.getCause() != null) { + if (S3Utils.S3RETRY.apply(e)) { + throw new IOException("Recoverable exception", e); } } + throw Throwables.propagate(e); } - ); + } + }; + if (CompressionUtils.isZip(s3Coords.path)) { + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + byteSource, + outDir, + S3Utils.S3RETRY, + true + ); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath()); + return result; + } + if (CompressionUtils.isGz(s3Coords.path)) { + final String fname = Paths.get(uri).getFileName().toString(); + final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname)); + + final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath()); + return result; + } + throw new IAE("Do not know how to load file type at [%s]", uri.toString()); } catch (Exception e) { try { - FileUtils.deleteDirectory(outDir); + org.apache.commons.io.FileUtils.deleteDirectory(outDir); } catch (IOException ioe) { log.warn( ioe, - "Failed to remove output directory for segment[%s] after exception: %s", - segment.getIdentifier(), - outDir + "Failed to remove output directory [%s] for segment pulled from [%s]", + outDir.getAbsolutePath(), + s3Coords.toString() ); } throw new SegmentLoadingException(e, e.getMessage()); } } + public static URI checkURI(URI uri) + { + if (uri.getScheme().equalsIgnoreCase(scheme)) { + uri = URI.create("s3" + uri.toString().substring(scheme.length())); + } else if (!uri.getScheme().equalsIgnoreCase("s3")) { + throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString()); + } + return uri; + } + + @Override + public InputStream getInputStream(URI uri) throws IOException + { + try { + return buildFileObject(uri, s3Client).openInputStream(); + } + catch (ServiceException e) { + throw new IOException(String.format("Could not load URI [%s]", uri.toString()), e); + } + } + + @Override + public Predicate shouldRetryPredicate() + { + // Yay! smart retries! + return new Predicate() + { + @Override + public boolean apply(Throwable e) + { + if (e == null) { + return false; + } + if (e instanceof ServiceException) { + return S3Utils.isServiceExceptionRecoverable((ServiceException) e); + } + if (S3Utils.S3RETRY.apply(e)) { + return true; + } + // Look all the way down the cause chain, just in case something wraps it deep. + return apply(e.getCause()); + } + }; + } + + /** + * Returns the "version" (aka last modified timestamp) of the URI + * + * @param uri The URI to check the last timestamp + * + * @return The time in ms of the last modification of the URI in String format + * + * @throws IOException + */ + @Override + public String getVersion(URI uri) throws IOException + { + try { + return String.format("%d", buildFileObject(uri, s3Client).getLastModified()); + } + catch (S3ServiceException e) { + if (S3Utils.isServiceExceptionRecoverable(e)) { + // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable + throw new IOException( + String.format("Could not fetch last modified timestamp from URI [%s]", uri.toString()), + e + ); + } else { + throw Throwables.propagate(e); + } + } + } + private String toFilename(String key, final String suffix) { String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' @@ -165,7 +329,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller } } - private static class S3Coords + protected static class S3Coords { String bucket; String path; @@ -180,6 +344,12 @@ public class S3DataSegmentPuller implements DataSegmentPuller } } + public S3Coords(String bucket, String key) + { + this.bucket = bucket; + this.path = key; + } + public String toString() { return String.format("s3://%s/%s", bucket, path); diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 78b02e42bbb..29de26d1b45 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -23,11 +23,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import org.jets3t.service.ServiceException; import org.jets3t.service.acl.gs.GSAccessControlList; import org.jets3t.service.impl.rest.httpclient.RestS3Service; diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java new file mode 100644 index 00000000000..ae260dbda0c --- /dev/null +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java @@ -0,0 +1,90 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.elasticbeanstalk.model.S3LocationNotInServiceRegionException; +import com.amazonaws.services.s3.AmazonS3URI; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.RetryUtils; +import com.metamx.common.StreamUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; + +import javax.swing.text.Segment; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * + */ +@JsonTypeName(S3StorageDruidModule.SCHEME) +public class S3LoadSpec implements LoadSpec +{ + @JsonProperty(S3DataSegmentPuller.BUCKET) + private final String bucket; + @JsonProperty(S3DataSegmentPuller.KEY) + private final String key; + + private final S3DataSegmentPuller puller; + + @JsonCreator + public S3LoadSpec( + @JacksonInject S3DataSegmentPuller puller, + @JsonProperty(S3DataSegmentPuller.BUCKET) String bucket, + @JsonProperty(S3DataSegmentPuller.KEY) String key + ) + { + Preconditions.checkNotNull(bucket); + Preconditions.checkNotNull(key); + this.bucket = bucket; + this.key = key; + this.puller = puller; + } + + @Override + public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size()); + } +} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index c1e7dae641f..bab2114ff6d 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -18,6 +18,7 @@ package io.druid.storage.s3; import com.amazonaws.auth.AWSCredentialsProvider; +import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -37,10 +38,32 @@ import java.util.List; */ public class S3StorageDruidModule implements DruidModule { + public static final String SCHEME = "s3_zip"; @Override public List getJacksonModules() { - return ImmutableList.of(); + return ImmutableList.of( + new Module() + { + @Override + public String getModuleName() + { + return "DruidS3-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(S3LoadSpec.class); + } + } + ); } @Override @@ -48,10 +71,10 @@ public class S3StorageDruidModule implements DruidModule { JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class); - Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); - Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class); - Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class); + Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(S3DataSegmentPuller.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index b9fc5441e4f..fe66e838dab 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,6 +19,7 @@ package io.druid.storage.s3; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.metamx.common.FileUtils; import com.metamx.common.RetryUtils; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; @@ -51,30 +52,38 @@ public class S3Utils } } + public static boolean isServiceExceptionRecoverable(ServiceException ex) + { + final boolean isIOException = ex.getCause() instanceof IOException; + final boolean isTimeout = "RequestTimeout".equals(((ServiceException) ex).getErrorCode()); + return isIOException || isTimeout; + } + + public static final Predicate S3RETRY = new Predicate() + { + @Override + public boolean apply(Throwable e) + { + if (e == null) { + return false; + } else if (e instanceof IOException) { + return true; + } else if (e instanceof ServiceException) { + return isServiceExceptionRecoverable((ServiceException) e); + } else { + return apply(e.getCause()); + } + } + }; + /** * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not * found, etc) are not retried. */ public static T retryS3Operation(Callable f) throws Exception { - final Predicate shouldRetry = new Predicate() - { - @Override - public boolean apply(Throwable e) - { - if (e instanceof IOException) { - return true; - } else if (e instanceof ServiceException) { - final boolean isIOException = e.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode()); - return isIOException || isTimeout; - } else { - return false; - } - } - }; final int maxTries = 10; - return RetryUtils.retry(f, shouldRetry, maxTries); + return RetryUtils.retry(f, S3RETRY, maxTries); } public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java index d14b5548545..8b60effdfb6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java @@ -18,7 +18,7 @@ package io.druid.indexing.common; import com.google.inject.Inject; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; @@ -30,11 +30,11 @@ import java.util.Arrays; */ public class SegmentLoaderFactory { - private final OmniSegmentLoader loader; + private final SegmentLoaderLocalCacheManager loader; @Inject public SegmentLoaderFactory( - OmniSegmentLoader loader + SegmentLoaderLocalCacheManager loader ) { this.loader = loader; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index f869e56749c..261d888334e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -31,7 +31,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoaderConfig; @@ -68,7 +68,7 @@ public class TaskToolboxTest private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class); private ObjectMapper ObjectMapper = new ObjectMapper(); - private OmniSegmentLoader mockOmniSegmentLoader = EasyMock.createMock(OmniSegmentLoader.class); + private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); private Task task = EasyMock.createMock(Task.class); @Rule @@ -93,7 +93,7 @@ public class TaskToolboxTest mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, mockMonitorScheduler, - new SegmentLoaderFactory(mockOmniSegmentLoader), + new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), ObjectMapper ); } @@ -144,11 +144,11 @@ public class TaskToolboxTest public void testFetchSegments() throws SegmentLoadingException, IOException { File expectedFile = temporaryFolder.newFile(); - EasyMock.expect(mockOmniSegmentLoader.getSegmentFiles((DataSegment)EasyMock.anyObject())) + EasyMock.expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles((DataSegment)EasyMock.anyObject())) .andReturn(expectedFile).anyTimes(); - EasyMock.expect(mockOmniSegmentLoader.withConfig((SegmentLoaderConfig)EasyMock.anyObject())) - .andReturn(mockOmniSegmentLoader).anyTimes(); - EasyMock.replay(mockOmniSegmentLoader); + EasyMock.expect(mockSegmentLoaderLocalCacheManager.withConfig((SegmentLoaderConfig)EasyMock.anyObject())) + .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes(); + EasyMock.replay(mockSegmentLoaderLocalCacheManager); DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(new Interval("2012-01-01/P1D")).version("1").size(1).build(); List segments = ImmutableList.of ( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 1650929b156..2946345115f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -17,6 +17,11 @@ package io.druid.indexing.firehose; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector; +import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -40,6 +45,7 @@ import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; +import io.druid.guice.GuiceInjectors; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; @@ -60,11 +66,11 @@ import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; -import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.LocalLoadSpec; import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.timeline.DataSegment; @@ -169,6 +175,37 @@ public class IngestSegmentFirehoseFactoryTest ts, new TaskActionToolbox(tl, mdc, newMockEmitter()) ); + + final ObjectMapper objectMapper = new DefaultObjectMapper(); + objectMapper.registerModule( + new SimpleModule("testModule").registerSubtypes(LocalLoadSpec.class) + ); + + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + objectMapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair( + guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector() + ), + new AnnotationIntrospectorPair( + guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector() + ) + ); + objectMapper.setInjectableValues( + new GuiceInjectableValues( + GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(LocalDataSegmentPuller.class); + } + } + ) + ) + ) + ); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null), tac, @@ -224,11 +261,7 @@ public class IngestSegmentFirehoseFactoryTest null, // query executor service null, // monitor scheduler new SegmentLoaderFactory( - new OmniSegmentLoader( - ImmutableMap.of( - "local", - new LocalDataSegmentPuller() - ), + new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() { @@ -237,10 +270,10 @@ public class IngestSegmentFirehoseFactoryTest { return Lists.newArrayList(); } - } + }, objectMapper ) ), - new DefaultObjectMapper() + objectMapper ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 4a9a64045b5..aaa2e3c878a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -68,10 +68,8 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; -import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; @@ -314,11 +312,7 @@ public class TaskLifecycleTest null, // query executor service null, // monitor scheduler new SegmentLoaderFactory( - new OmniSegmentLoader( - ImmutableMap.of( - "local", - new LocalDataSegmentPuller() - ), + new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() { @@ -327,7 +321,7 @@ public class TaskLifecycleTest { return Lists.newArrayList(); } - } + }, new DefaultObjectMapper() ) ), new DefaultObjectMapper() diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 478ccbaf328..ddb38fd1d73 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -20,7 +20,6 @@ package io.druid.indexing.worker; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Files; import io.druid.curator.PotentiallyGzippedCompressionProvider; @@ -36,9 +35,7 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.DataSegmentPuller; -import io.druid.segment.loading.LocalDataSegmentPuller; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.initialization.IndexerZkConfig; @@ -124,11 +121,7 @@ public class WorkerTaskMonitorTest new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0, null), null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( - new OmniSegmentLoader( - ImmutableMap.of( - "local", - new LocalDataSegmentPuller() - ), + new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() { @@ -138,7 +131,7 @@ public class WorkerTaskMonitorTest return Lists.newArrayList(); } } - ) + , jsonMapper) ), jsonMapper ), null diff --git a/pom.xml b/pom.xml index 33adea93305..e09d7f80051 100644 --- a/pom.xml +++ b/pom.xml @@ -65,10 +65,10 @@ - 0.26.15 + 0.27.0 2.7.0 9.2.5.v20141112 - 0.3.5 + 0.3.6 2.4.4 2.2 1.7.10 diff --git a/processing/src/main/java/io/druid/segment/SegmentMissingException.java b/processing/src/main/java/io/druid/segment/SegmentMissingException.java index df1dc269a3b..f66a8c50617 100644 --- a/processing/src/main/java/io/druid/segment/SegmentMissingException.java +++ b/processing/src/main/java/io/druid/segment/SegmentMissingException.java @@ -24,4 +24,8 @@ public class SegmentMissingException extends ISE public SegmentMissingException(String formatText, Object... arguments) { super(String.format(formatText, arguments)); } + + public SegmentMissingException(Throwable cause, String formatText, Object... arguments){ + super(cause, formatText, arguments); + } } diff --git a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java index 0015ccdf325..295ef0cdc21 100644 --- a/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java +++ b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java @@ -17,26 +17,34 @@ package io.druid.guice; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; -import com.google.inject.Module; +import io.druid.initialization.DruidModule; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusherConfig; -import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.LocalLoadSpec; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoader; +import java.util.List; + /** */ -public class LocalDataStorageDruidModule implements Module +public class LocalDataStorageDruidModule implements DruidModule { + public static final String SCHEME = "local"; + @Override public void configure(Binder binder) { - binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class); + binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class); bindDeepStorageLocal(binder); @@ -48,14 +56,14 @@ public class LocalDataStorageDruidModule implements Module private static void bindDeepStorageLocal(Binder binder) { Binders.dataSegmentPullerBinder(binder) - .addBinding("local") - .to(LocalDataSegmentPuller.class) - .in(LazySingleton.class); + .addBinding(SCHEME) + .to(LocalDataSegmentPuller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class)) - .addBinding("local") - .to(LocalDataSegmentKiller.class) - .in(LazySingleton.class); + .addBinding(SCHEME) + .to(LocalDataSegmentKiller.class) + .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) .addBinding("local") @@ -63,4 +71,31 @@ public class LocalDataStorageDruidModule implements Module .in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class); } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new com.fasterxml.jackson.databind.Module() + { + @Override + public String getModuleName() + { + return "DruidLocalStorage-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(LocalLoadSpec.class); + } + } + ); + } } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java index 58873844e00..b56cd9db8f2 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java @@ -17,58 +17,207 @@ package io.druid.segment.loading; +import com.google.common.base.Predicate; import com.google.common.io.Files; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; import com.metamx.common.MapUtils; +import com.metamx.common.UOE; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; +import javax.tools.FileObject; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Map; /** */ -public class LocalDataSegmentPuller implements DataSegmentPuller +public class LocalDataSegmentPuller implements DataSegmentPuller, URIDataPuller { + public static FileObject buildFileObject(final URI uri) + { + final Path path = Paths.get(uri); + final File file = path.toFile(); + return new FileObject() + { + @Override + public URI toUri() + { + return uri; + } + + @Override + public String getName() + { + return path.getFileName().toString(); + } + + @Override + public InputStream openInputStream() throws IOException + { + return new FileInputStream(file); + } + + @Override + public OutputStream openOutputStream() throws IOException + { + return new FileOutputStream(file); + } + + @Override + public Reader openReader(boolean ignoreEncodingErrors) throws IOException + { + return new FileReader(file); + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException + { + throw new UOE("CharSequence not supported"); + } + + @Override + public Writer openWriter() throws IOException + { + return new FileWriter(file); + } + + @Override + public long getLastModified() + { + return file.lastModified(); + } + + @Override + public boolean delete() + { + return file.delete(); + } + }; + } + private static final Logger log = new Logger(LocalDataSegmentPuller.class); @Override public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException { - final File path = getFile(segment); + getSegmentFiles(getFile(segment), dir); + } - if (path.isDirectory()) { - if (path.equals(dir)) { + public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException + { + if (sourceFile.isDirectory()) { + if (sourceFile.equals(dir)) { log.info("Asked to load [%s] into itself, done!", dir); - return; + return new FileUtils.FileCopyResult(sourceFile); } - log.info("Copying files from [%s] to [%s]", path, dir); - File file = null; - try { - final File[] files = path.listFiles(); - for (int i = 0; i < files.length; ++i) { - file = files[i]; - Files.copy(file, new File(dir, file.getName())); + final File[] files = sourceFile.listFiles(); + if (files == null) { + throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath()); + } + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile); + for (final File oldFile : files) { + if (oldFile.isDirectory()) { + log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath()); + continue; } - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to copy file[%s].", file); - } - } else { - if (!path.getName().endsWith(".zip")) { - throw new SegmentLoadingException("File is not a zip file[%s]", path); - } - log.info("Unzipping local file[%s] to [%s]", path, dir); + result.addFiles( + FileUtils.retryCopy( + Files.asByteSource(oldFile), + new File(dir, oldFile.getName()), + shouldRetryPredicate(), + 10 + ).getFiles() + ); + } + log.info( + "Coppied %d bytes from [%s] to [%s]", + result.size(), + sourceFile.getAbsolutePath(), + dir.getAbsolutePath() + ); + return result; + } + if (CompressionUtils.isZip(sourceFile.getName())) { try { - CompressionUtils.unzip(path, dir); + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + Files.asByteSource(sourceFile), + dir, + shouldRetryPredicate(), + false + ); + log.info( + "Unzipped %d bytes from [%s] to [%s]", + result.size(), + sourceFile.getAbsolutePath(), + dir.getAbsolutePath() + ); + return result; } catch (IOException e) { - throw new SegmentLoadingException(e, "Unable to unzip file[%s]", path); + throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath()); } } + if (CompressionUtils.isGz(sourceFile.getName())) { + final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName())); + final FileUtils.FileCopyResult result = CompressionUtils.gunzip( + Files.asByteSource(sourceFile), + outFile, + shouldRetryPredicate() + ); + log.info( + "Gunzipped %d bytes from [%s] to [%s]", + result.size(), + sourceFile.getAbsolutePath(), + outFile.getAbsolutePath() + ); + return result; + } + throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath()); + } + + + @Override + public InputStream getInputStream(URI uri) throws IOException + { + return buildFileObject(uri).openInputStream(); + } + + /** + * Returns the "version" (aka last modified timestamp) of the URI of interest + * + * @param uri The URI to check the last modified timestamp + * + * @return The last modified timestamp in ms of the URI in String format + */ + @Override + public String getVersion(URI uri) + { + return String.format("%d", buildFileObject(uri).getLastModified()); + } + + @Override + public Predicate shouldRetryPredicate() + { + // It would be nice if there were better logic for smarter retries. For example: If the error is that the file is + // not found, there's only so much that retries would do (unless the file was temporarily absent for some reason). + // Since this is not a commonly used puller in production, and in general is more useful in testing/debugging, + // I do not have a good sense of what kind of Exceptions people would expect to encounter in the wild + return FileUtils.IS_EXCEPTION; } private File getFile(DataSegment segment) throws SegmentLoadingException diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index b55ac34fcc0..17b5db0ebf3 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; import com.metamx.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; -import io.druid.utils.CompressionUtils; import java.io.File; import java.io.IOException; diff --git a/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java new file mode 100644 index 00000000000..b0ab3a71329 --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java @@ -0,0 +1,64 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.api.client.util.Preconditions; +import io.druid.guice.LocalDataStorageDruidModule; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * + */ +@JsonTypeName(LocalDataStorageDruidModule.SCHEME) +public class LocalLoadSpec implements LoadSpec +{ + private final Path path; + private final LocalDataSegmentPuller puller; + + @JsonCreator + public LocalLoadSpec( + @JacksonInject LocalDataSegmentPuller puller, + @JsonProperty(value = "path", required = true) final String path + ) + { + Preconditions.checkNotNull(path); + this.path = Paths.get(path); + Preconditions.checkArgument(Files.exists(Paths.get(path)), "[%s] does not exist", path); + this.puller = puller; + } + + @JsonProperty + public String getPath() + { + return path.toString(); + } + + @Override + public LoadSpecResult loadSegment(final File outDir) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(path.toFile(), outDir).size()); + } +} diff --git a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java similarity index 79% rename from server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java rename to server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index aecd3820234..570a50f6cc5 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -17,11 +17,12 @@ package io.druid.segment.loading; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.inject.Inject; import com.metamx.common.ISE; -import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; +import io.druid.guice.annotations.Json; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; @@ -32,32 +33,31 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; -import java.util.Map; /** */ -public class OmniSegmentLoader implements SegmentLoader +public class SegmentLoaderLocalCacheManager implements SegmentLoader { - private static final Logger log = new Logger(OmniSegmentLoader.class); + private static final Logger log = new Logger(SegmentLoaderLocalCacheManager.class); - private final Map pullers; private final QueryableIndexFactory factory; private final SegmentLoaderConfig config; + private final ObjectMapper jsonMapper; private final List locations; private final Object lock = new Object(); @Inject - public OmniSegmentLoader( - Map pullers, + public SegmentLoaderLocalCacheManager( QueryableIndexFactory factory, - SegmentLoaderConfig config + SegmentLoaderConfig config, + @Json ObjectMapper mapper ) { - this.pullers = pullers; this.factory = factory; this.config = config; + this.jsonMapper = mapper; this.locations = Lists.newArrayList(); for (StorageLocationConfig locationConfig : config.getLocations()) { @@ -65,9 +65,9 @@ public class OmniSegmentLoader implements SegmentLoader } } - public OmniSegmentLoader withConfig(SegmentLoaderConfig config) + public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config) { - return new OmniSegmentLoader(pullers, factory, config); + return new SegmentLoaderLocalCacheManager(factory, config, jsonMapper); } @Override @@ -127,22 +127,26 @@ public class OmniSegmentLoader implements SegmentLoader log.debug("Unable to make parent file[%s]", storageDir); } try { - downloadStartMarker.createNewFile(); + if (!downloadStartMarker.createNewFile()) { + throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir); + } } catch (IOException e) { - throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir); + throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir); } } - getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir); + // LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies. + final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class); + final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir); + if(result.getSize() != segment.getSize()){ + log.warn("Segment [%s] is different than expected size. Expected [%d] found [%d]", segment.getIdentifier(), segment.getSize(), result.getSize()); + } if (!downloadStartMarker.delete()) { throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); } - - loc.addSegment(segment); - retVal = storageDir; } else { retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); @@ -179,18 +183,6 @@ public class OmniSegmentLoader implements SegmentLoader } } - private DataSegmentPuller getPuller(Map loadSpec) throws SegmentLoadingException - { - String type = MapUtils.getString(loadSpec, "type"); - DataSegmentPuller loader = pullers.get(type); - - if (loader == null) { - throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, pullers.keySet()); - } - - return loader; - } - public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException { if (cacheFile.equals(baseFile)) { diff --git a/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java b/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java new file mode 100644 index 00000000000..8a8f8703070 --- /dev/null +++ b/server/src/test/java/io/druid/segment/loading/LoadSpecTest.java @@ -0,0 +1,108 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector; +import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.metamx.common.IAE; +import io.druid.guice.GuiceInjectors; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; + +/** + * + */ +@RunWith(Parameterized.class) +public class LoadSpecTest +{ + @Parameterized.Parameters + public static Collection getParameters() + { + return ImmutableList.of( + new Object[]{"{\"path\":\"/\",\"type\":\"local\"}", "local"} + ); + } + + private final String value; + private final String expectedId; + + public LoadSpecTest(String value, String expectedId) + { + this.value = value; + this.expectedId = expectedId; + } + + private static ObjectMapper mapper; + + @BeforeClass + public static void setUp() + { + final Injector injector = GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(LocalDataSegmentPuller.class); + } + } + ) + ); + mapper = new DefaultObjectMapper(); + mapper.registerModule( new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class)); + mapper.setInjectableValues(new GuiceInjectableValues(injector)); + + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + mapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair( + guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector() + ), + new AnnotationIntrospectorPair( + guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector() + ) + ); + } + + @Test + public void testStringResolve() throws IOException + { + LoadSpec loadSpec = mapper.readValue(value, LoadSpec.class); + Assert.assertEquals(expectedId, loadSpec.getClass().getAnnotation(JsonTypeName.class).value()); + } +} diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java new file mode 100644 index 00000000000..55a2742c91e --- /dev/null +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java @@ -0,0 +1,152 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.loading; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.metamx.common.CompressionUtils; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.zip.GZIPOutputStream; + +/** + * + */ +public class LocalDataSegmentPullerTest +{ + private File tmpDir; + private LocalDataSegmentPuller puller; + + @Before + public void setup() + { + tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + puller = new LocalDataSegmentPuller(); + } + + @After + public void tearDown() throws IOException + { + deleteFiles(tmpDir); + } + + public static void deleteFiles(File... files) throws IOException + { + IOException ex = null; + for (File file : files) { + if (file == null || !file.exists()) { + continue; + } + if (!file.delete()) { + IOException e = new IOException("Could not delete " + file.getAbsolutePath()); + if (ex == null) { + ex = e; + } else { + ex.addSuppressed(e); + } + } + } + if (ex != null) { + throw ex; + } + } + + @Test + public void simpleZipTest() throws IOException, SegmentLoadingException + { + File file = new File(tmpDir, "test1data"); + File zipFile = File.createTempFile("ziptest", ".zip"); + file.deleteOnExit(); + zipFile.deleteOnExit(); + zipFile.delete(); + try { + try (OutputStream outputStream = new FileOutputStream(file)) { + outputStream.write(new byte[0]); + outputStream.flush(); + } + CompressionUtils.zip(tmpDir, zipFile); + file.delete(); + + Assert.assertFalse(file.exists()); + Assert.assertTrue(zipFile.exists()); + puller.getSegmentFiles(zipFile, tmpDir); + Assert.assertTrue(file.exists()); + } + finally { + deleteFiles(file, zipFile); + } + } + + @Test + public void simpleGZTest() throws IOException, SegmentLoadingException + { + File zipFile = File.createTempFile("gztest", ".gz"); + File unZipFile = new File( + tmpDir, + Files.getNameWithoutExtension( + zipFile.getAbsolutePath() + ) + ); + unZipFile.delete(); + zipFile.deleteOnExit(); + zipFile.delete(); + try { + try (OutputStream fOutStream = new FileOutputStream(zipFile)) { + try (OutputStream outputStream = new GZIPOutputStream(fOutStream)) { + outputStream.write(new byte[0]); + outputStream.flush(); + } + } + + Assert.assertTrue(zipFile.exists()); + Assert.assertFalse(unZipFile.exists()); + puller.getSegmentFiles(zipFile, tmpDir); + Assert.assertTrue(unZipFile.exists()); + }finally{ + deleteFiles(zipFile, unZipFile); + } + } + + @Test + public void simpleDirectoryTest() throws IOException, SegmentLoadingException + { + File srcDir = Files.createTempDir(); + File tmpFile = File.createTempFile("test", "file", srcDir); + File expectedOutput = new File(tmpDir, Files.getNameWithoutExtension(tmpFile.getAbsolutePath())); + expectedOutput.delete(); + try{ + Assert.assertFalse(expectedOutput.exists()); + puller.getSegmentFiles(srcDir, tmpDir); + Assert.assertTrue(expectedOutput.exists()); + }finally{ + deleteFiles(expectedOutput, tmpFile, srcDir); + } + } +} +