Add URI handling to SegmentPullers

* Requires https://github.com/druid-io/druid-api/pull/37
* Requires https://github.com/metamx/java-util/pull/22
* Moves the puller logic to use a more standard workflow going through java-util helpers instead of re-writing the handlers for each impl
  * General workflow goes like this: 1) LoadSpec makes sure the correct Puller is called with the correct parameters. 2) The Puller sets up general information like how to make an InputStream, how to find a file name (for .gz files for example), and when to retry. 3) CompressionUtils does most of the heavy lifting when it can
This commit is contained in:
Charles Allen 2015-02-17 16:37:28 -08:00
parent 9c741d58f4
commit 6d407e8677
30 changed files with 1805 additions and 281 deletions

View File

@ -22,6 +22,9 @@ import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.metamx.common.IAE;
import java.lang.reflect.Type;
/**
*/
@ -36,6 +39,13 @@ public class GuiceInjectableValues extends InjectableValues
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
return injector.getInstance((Key) valueId);
// From the docs: "Object that identifies value to inject; may be a simple name or more complex identifier object,
// whatever provider needs"
// Currently we should only be dealing with `Key` instances, and anything more advanced should be handled with
// great care
if(valueId instanceof Key){
return injector.getInstance((Key) valueId);
}
throw new IAE("Unknown class type [%s] for valueId [%s]", valueId.getClass().getCanonicalName(), valueId.toString());
}
}

View File

@ -17,79 +17,105 @@
package io.druid.storage.cassandra;
import com.google.common.io.Files;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
/**
* Cassandra Segment Puller
*
* @author boneill42
*/
public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller
{
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final int CONCURRENCY = 10;
private static final int BATCH_SIZE = 10;
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final int CONCURRENCY = 10;
private static final int BATCH_SIZE = 10;
@Inject
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
{
super(config);
}
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
{
super(config);
}
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
getSegmentFiles(key, outDir);
}
public com.metamx.common.FileUtils.FileCopyResult getSegmentFiles(final String key, final File outDir) throws SegmentLoadingException{
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.exists())
{
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
if (!outDir.isDirectory())
{
throw new ISE("outDir[%s] must be a directory.", outDir);
}
long startTime = System.currentTimeMillis();
final File tmpFile = new File(outDir, "index.zip");
log.info("Pulling to temporary local cache [%s]", tmpFile.getAbsolutePath());
long startTime = System.currentTimeMillis();
ObjectMetadata meta = null;
final File outFile = new File(outDir, "index.zip");
try
{
try
{
log.info("Writing to [%s]", outFile.getAbsolutePath());
OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput();
meta = ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
os.close();
CompressionUtils.unzip(outFile, outDir);
} catch (Exception e)
{
FileUtils.deleteDirectory(outDir);
}
} catch (Exception e)
{
throw new SegmentLoadingException(e, e.getMessage());
}
log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
meta.getObjectSize());
}
final com.metamx.common.FileUtils.FileCopyResult localResult;
try {
localResult = RetryUtils.retry(
new Callable<com.metamx.common.FileUtils.FileCopyResult>()
{
@Override
public com.metamx.common.FileUtils.FileCopyResult call() throws Exception
{
try (OutputStream os = new FileOutputStream(tmpFile)) {
final ObjectMetadata meta = ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
}
return new com.metamx.common.FileUtils.FileCopyResult(tmpFile);
}
},
Predicates.<Throwable>alwaysTrue(),
10
);
}catch (Exception e){
throw new SegmentLoadingException(e, "Unable to copy key [%s] to file [%s]", key, tmpFile.getAbsolutePath());
}
try{
final com.metamx.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(tmpFile, outDir);
log.info(
"Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
result.size()
);
return result;
}
catch (Exception e) {
try {
FileUtils.deleteDirectory(outDir);
}
catch (IOException e1) {
log.error(e1, "Error clearing segment directory [%s]", outDir.getAbsolutePath());
e.addSuppressed(e1);
}
throw new SegmentLoadingException(e, e.getMessage());
} finally {
if(!tmpFile.delete()){
log.warn("Could not delete cache file at [%s]", tmpFile.getAbsolutePath());
}
}
}
}

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.logger.Logger;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
@ -28,7 +29,6 @@ import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import java.io.File;
import java.io.FileInputStream;

View File

@ -17,7 +17,7 @@
package io.druid.storage.cassandra;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.core.Version;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
@ -34,24 +34,47 @@ import java.util.List;
*/
public class CassandraDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
public static final String SCHEME = "c*";
@Override
public void configure(Binder binder)
{
Binders.dataSegmentPullerBinder(binder)
.addBinding("c*")
.to(CassandraDataSegmentPuller.class)
.in(LazySingleton.class);
.addBinding(SCHEME)
.to(CassandraDataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("c*")
.addBinding(SCHEME)
.to(CassandraDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", CassandraDataSegmentConfig.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of(
new com.fasterxml.jackson.databind.Module()
{
@Override
public String getModuleName()
{
return "DruidCassandraStorage-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(CassandraLoadSpec.class);
}
}
);
}
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cassandra;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import java.io.File;
/**
*
*/
@JsonTypeName(CassandraDruidModule.SCHEME)
public class CassandraLoadSpec implements LoadSpec
{
@JsonProperty
private final String key;
private final CassandraDataSegmentPuller puller;
@JsonCreator
public CassandraLoadSpec(
@JacksonInject CassandraDataSegmentPuller puller,
@JsonProperty("key") String key
)
{
this.puller = puller;
this.key = key;
}
@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(key, outDir).size());
}
}

View File

@ -66,10 +66,36 @@
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,23 +17,144 @@
package io.druid.storage.hdfs;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.FileUtils;
import com.metamx.common.IAE;
import com.metamx.common.RetryUtils;
import com.metamx.common.UOE;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import javax.tools.FileObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.Callable;
/**
*/
public class HdfsDataSegmentPuller implements DataSegmentPuller
public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller
{
/**
* FileObject.getLastModified and FileObject.delete don't throw IOException. This allows us to wrap those calls
*/
public static class HdfsIOException extends RuntimeException
{
private final IOException cause;
public HdfsIOException(IOException ex)
{
super(ex);
this.cause = ex;
}
protected IOException getIOException()
{
return cause;
}
}
public static FileObject buildFileObject(final URI uri, final Configuration config)
{
return buildFileObject(uri, config, false);
}
public static FileObject buildFileObject(final URI uri, final Configuration config, final Boolean overwrite)
{
return new FileObject()
{
final Path path = new Path(uri);
@Override
public URI toUri()
{
return uri;
}
@Override
public String getName()
{
return path.getName();
}
@Override
public InputStream openInputStream() throws IOException
{
final FileSystem fs = path.getFileSystem(config);
return fs.open(path);
}
@Override
public OutputStream openOutputStream() throws IOException
{
final FileSystem fs = path.getFileSystem(config);
return fs.create(path, overwrite);
}
@Override
public Reader openReader(boolean ignoreEncodingErrors) throws IOException
{
throw new UOE("HDFS Reader not supported");
}
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException
{
throw new UOE("HDFS CharSequence not supported");
}
@Override
public Writer openWriter() throws IOException
{
throw new UOE("HDFS Writer not supported");
}
@Override
public long getLastModified()
{
try {
final FileSystem fs = path.getFileSystem(config);
return fs.getFileStatus(path).getModificationTime();
}
catch (IOException ex) {
throw new HdfsIOException(ex);
}
}
@Override
public boolean delete()
{
try {
final FileSystem fs = path.getFileSystem(config);
return fs.delete(path, false);
}
catch (IOException ex) {
throw new HdfsIOException(ex);
}
}
};
}
private static final Logger log = new Logger(HdfsDataSegmentPuller.class);
private final Configuration config;
@Inject
@ -42,46 +163,190 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
this.config = config;
}
@Override
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
final Path path = getPath(segment);
getSegmentFiles(getPath(segment), dir);
}
final FileSystem fs = checkPathAndGetFilesystem(path);
public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException
{
final LocalFileSystem localFileSystem = new LocalFileSystem();
try {
final FileSystem fs = path.getFileSystem(config);
if (fs.isDirectory(path)) {
if (path.getName().endsWith(".zip")) {
try {
try (FSDataInputStream in = fs.open(path)) {
CompressionUtils.unzip(in, dir);
// -------- directory ---------
try {
return RetryUtils.retry(
new Callable<FileUtils.FileCopyResult>()
{
@Override
public FileUtils.FileCopyResult call() throws Exception
{
if (!fs.exists(path)) {
throw new SegmentLoadingException("No files found at [%s]", path.toString());
}
final RemoteIterator<LocatedFileStatus> children = fs.listFiles(path, false);
final ArrayList<FileUtils.FileCopyResult> localChildren = new ArrayList<>();
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
while (children.hasNext()) {
final LocatedFileStatus child = children.next();
final Path childPath = child.getPath();
final String fname = childPath.getName();
if (fs.isDirectory(childPath)) {
log.warn("[%s] is a child directory, skipping", childPath.toString());
} else {
final File outFile = new File(outDir, fname);
// Actual copy
fs.copyToLocalFile(childPath, new Path(outFile.toURI()));
result.addFile(outFile);
}
}
log.info(
"Copied %d bytes from [%s] to [%s]",
result.size(),
path.toString(),
outDir.getAbsolutePath()
);
return result;
}
},
shouldRetryPredicate(),
10
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
} else if (CompressionUtils.isZip(path.getName())) {
// -------- zip ---------
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return getInputStream(path);
}
}, outDir, shouldRetryPredicate(), false
);
log.info(
"Unzipped %d bytes from [%s] to [%s]",
result.size(),
path.toString(),
outDir.getAbsolutePath()
);
return result;
} else if (CompressionUtils.isGz(path.getName())) {
// -------- gzip ---------
final String fname = path.getName();
final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname));
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return getInputStream(path);
}
},
outFile
);
log.info(
"Gunzipped %d bytes from [%s] to [%s]",
result.size(),
path.toString(),
outFile.getAbsolutePath()
);
return result;
} else {
throw new SegmentLoadingException("Do not know how to handle file type at [%s]", path.toString());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Some IOException");
}
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Error loading [%s]", path.toString());
}
}
public FileUtils.FileCopyResult getSegmentFiles(URI uri, File outDir) throws SegmentLoadingException
{
if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) {
throw new SegmentLoadingException("Don't know how to load SCHEME for URI [%s]", uri.toString());
}
return getSegmentFiles(new Path(uri), outDir);
}
public InputStream getInputStream(Path path) throws IOException
{
return buildFileObject(path.toUri(), config).openInputStream();
}
@Override
public InputStream getInputStream(URI uri) throws IOException
{
if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) {
throw new IAE("Don't know how to load SCHEME [%s] for URI [%s]", uri.getScheme(), uri.toString());
}
return buildFileObject(uri, config).openInputStream();
}
/**
* Return the "version" (aka last modified timestamp) of the URI
*
* @param uri The URI of interest
*
* @return The last modified timestamp of the uri in String format
*
* @throws IOException
*/
@Override
public String getVersion(URI uri) throws IOException
{
try {
return String.format("%d", buildFileObject(uri, config).getLastModified());
}
catch (HdfsIOException ex) {
throw ex.getIOException();
}
}
@Override
public Predicate<Throwable> shouldRetryPredicate()
{
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
if (input == null) {
return false;
}
if (input instanceof HdfsIOException) {
return true;
}
if (input instanceof IOException) {
return true;
}
return apply(input.getCause());
}
};
}
private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get("path")));
}
private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException
{
FileSystem fs;
try {
fs = path.getFileSystem(config);
if (!fs.exists(path)) {
throw new SegmentLoadingException("Path[%s] doesn't exist.", path);
}
return fs;
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path);
}
}
}

View File

@ -22,12 +22,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;

View File

@ -0,0 +1,61 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.metamx.common.ISE;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.util.Map;
/**
*
*/
@JsonTypeName(HdfsStorageDruidModule.SCHEME)
public class HdfsLoadSpec implements LoadSpec
{
private final Path path;
final HdfsDataSegmentPuller puller;
@JsonCreator
public HdfsLoadSpec(
@JacksonInject HdfsDataSegmentPuller puller,
@JsonProperty(value = "path", required = true) String path
){
Preconditions.checkNotNull(path);
this.path = new Path(path);
this.puller = puller;
}
@JsonProperty("path")
public final String getPathString(){
return path.toString();
}
@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(path, outDir).size());
}
}

View File

@ -17,6 +17,7 @@
package io.druid.storage.hdfs;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
@ -36,6 +37,7 @@ import java.util.Properties;
*/
public class HdfsStorageDruidModule implements DruidModule
{
public static final String SCHEME = "hdfs";
private Properties props = null;
@Inject
@ -47,15 +49,36 @@ public class HdfsStorageDruidModule implements DruidModule
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "DruidHDFSStorage-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(HdfsLoadSpec.class);
}
}
);
}
@Override
public void configure(Binder binder)
{
Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
final Configuration conf = new Configuration();
if (props != null) {

View File

@ -0,0 +1,220 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.loading;
import com.google.common.io.ByteStreams;
import com.metamx.common.CompressionUtils;
import com.metamx.common.StringUtils;
import io.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.util.zip.GZIPOutputStream;
/**
*
*/
public class HdfsDataSegmentPullerTest
{
private static MiniDFSCluster miniCluster;
private static File hdfsTmpDir;
private static URI uriBase;
private static Path filePath = new Path("/tmp/foo");
private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum";
private static byte[] pathByteContents = StringUtils.toUtf8(pathContents);
private static Configuration conf;
@BeforeClass
public static void setupStatic() throws IOException, ClassNotFoundException
{
hdfsTmpDir = File.createTempFile("hdfsHandlerTest", "dir");
hdfsTmpDir.deleteOnExit();
if (!hdfsTmpDir.delete()) {
throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()));
}
conf = new Configuration(true);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
miniCluster = new MiniDFSCluster.Builder(conf).build();
uriBase = miniCluster.getURI(0);
final File tmpFile = File.createTempFile("hdfsHandlerTest", ".data");
tmpFile.delete();
try {
tmpFile.deleteOnExit();
Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath());
try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) {
Files.copy(tmpFile.toPath(), stream);
}
}
finally {
tmpFile.delete();
}
}
@AfterClass
public static void tearDownStatic() throws IOException
{
if (miniCluster != null) {
miniCluster.shutdown(true);
}
}
private HdfsDataSegmentPuller puller;
@Before
public void setUp()
{
puller = new HdfsDataSegmentPuller(conf);
}
@Test
public void testZip() throws IOException, SegmentLoadingException
{
final File tmpDir = com.google.common.io.Files.createTempDir();
tmpDir.deleteOnExit();
final File tmpFile = File.createTempFile("zipContents", ".txt", tmpDir);
tmpFile.deleteOnExit();
final Path zipPath = new Path("/tmp/testZip.zip");
final File outTmpDir = com.google.common.io.Files.createTempDir();
outTmpDir.deleteOnExit();
final URI uri = URI.create(uriBase.toString() + zipPath.toString());
tmpFile.deleteOnExit();
try (final OutputStream stream = new FileOutputStream(tmpFile)) {
ByteStreams.copy(new ByteArrayInputStream(pathByteContents), stream);
}
Assert.assertTrue(tmpFile.exists());
final File outFile = new File(outTmpDir, tmpFile.getName());
outFile.delete();
try (final OutputStream stream = miniCluster.getFileSystem().create(zipPath)) {
CompressionUtils.zip(tmpDir, stream);
}
try {
Assert.assertFalse(outFile.exists());
puller.getSegmentFiles(uri, outTmpDir);
Assert.assertTrue(outFile.exists());
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
}
finally {
if (tmpFile.exists()) {
tmpFile.delete();
}
if (outFile.exists()) {
outFile.delete();
}
if (outTmpDir.exists()) {
outTmpDir.delete();
}
if (tmpDir.exists()) {
tmpDir.delete();
}
}
}
@Test
public void testGZ() throws IOException, SegmentLoadingException
{
final Path zipPath = new Path("/tmp/testZip.gz");
final File outTmpDir = com.google.common.io.Files.createTempDir();
outTmpDir.deleteOnExit();
final File outFile = new File(outTmpDir, "testZip");
outFile.delete();
final URI uri = URI.create(uriBase.toString() + zipPath.toString());
try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) {
try (final OutputStream gzStream = new GZIPOutputStream(outputStream)) {
try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
ByteStreams.copy(inputStream, gzStream);
}
}
}
try {
Assert.assertFalse(outFile.exists());
puller.getSegmentFiles(uri, outTmpDir);
Assert.assertTrue(outFile.exists());
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
}
finally {
if (outFile.exists()) {
outFile.delete();
}
if (outTmpDir.exists()) {
outTmpDir.delete();
}
}
}
@Test
public void testDir() throws IOException, SegmentLoadingException
{
final Path zipPath = new Path("/tmp/tmp2/test.txt");
final File outTmpDir = com.google.common.io.Files.createTempDir();
outTmpDir.deleteOnExit();
final File outFile = new File(outTmpDir, "test.txt");
outFile.delete();
final URI uri = URI.create(uriBase.toString() + "/tmp/tmp2");
try (final OutputStream outputStream = miniCluster.getFileSystem().create(zipPath)) {
try (final InputStream inputStream = new ByteArrayInputStream(pathByteContents)) {
ByteStreams.copy(inputStream, outputStream);
}
}
try {
Assert.assertFalse(outFile.exists());
puller.getSegmentFiles(uri, outTmpDir);
Assert.assertTrue(outFile.exists());
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
}
finally {
if (outFile.exists()) {
outFile.delete();
}
if (outTmpDir.exists()) {
outTmpDir.delete();
}
}
}
}

View File

@ -17,37 +17,125 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3URI;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.FileUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.UOE;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import javax.tools.FileObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
/**
* A data segment puller that also hanldes URI data pulls.
*/
public class S3DataSegmentPuller implements DataSegmentPuller
public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
{
public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws S3ServiceException
{
final URI checkedUri = checkURI(uri);
final AmazonS3URI s3URI = new AmazonS3URI(checkedUri);
final String key = s3URI.getKey();
final String bucket = s3URI.getBucket();
final S3Object s3Obj = s3Client.getObject(bucket, key);
final String path = uri.getPath();
return new FileObject()
{
@Override
public URI toUri()
{
return uri;
}
@Override
public String getName()
{
final String ext = Files.getFileExtension(path);
return Files.getNameWithoutExtension(path) + (Strings.isNullOrEmpty(ext) ? "" : ("." + ext));
}
@Override
public InputStream openInputStream() throws IOException
{
try {
return s3Obj.getDataInputStream();
}
catch (ServiceException e) {
throw new IOException(String.format("Could not load S3 URI [%s]", checkedUri.toString()), e);
}
}
@Override
public OutputStream openOutputStream() throws IOException
{
throw new UOE("Cannot stream S3 output");
}
@Override
public Reader openReader(boolean ignoreEncodingErrors) throws IOException
{
throw new UOE("Cannot open reader");
}
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException
{
throw new UOE("Cannot open character sequence");
}
@Override
public Writer openWriter() throws IOException
{
throw new UOE("Cannot open writer");
}
@Override
public long getLastModified()
{
return s3Obj.getLastModifiedDate().getTime();
}
@Override
public boolean delete()
{
throw new UOE("Cannot delete S3 items anonymously. jetS3t doesn't support authenticated deletes easily.");
}
};
}
public static final String scheme = S3StorageDruidModule.SCHEME;
private static final Logger log = new Logger(S3DataSegmentPuller.class);
private static final String BUCKET = "bucket";
private static final String KEY = "key";
protected static final String BUCKET = "bucket";
protected static final String KEY = "key";
private final RestS3Service s3Client;
@ -62,7 +150,12 @@ public class S3DataSegmentPuller implements DataSegmentPuller
@Override
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{
final S3Coords s3Coords = new S3Coords(segment);
getSegmentFiles(new S3Coords(segment), outDir);
}
public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir)
throws SegmentLoadingException
{
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@ -79,63 +172,134 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
try {
S3Utils.retryS3Operation(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
try {
s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
try (InputStream in = s3Obj.getDataInputStream()) {
final String key = s3Obj.getKey();
if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir);
} else if (key.endsWith(".gz")) {
final File outFile = new File(outDir, toFilename(key, ".gz"));
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
} else {
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
}
log.info(
"Pull of file[%s/%s] completed in %,d millis",
s3Obj.getBucketName(),
s3Obj.getKey(),
System.currentTimeMillis() - startTime
);
return null;
}
catch (IOException e) {
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
}
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
final URI uri = URI.create(String.format("s3://%s/%s", s3Coords.bucket, s3Coords.path));
final ByteSource byteSource = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return buildFileObject(uri, s3Client).openInputStream();
}
catch (ServiceException e) {
if (e.getCause() != null) {
if (S3Utils.S3RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}
}
throw Throwables.propagate(e);
}
);
}
};
if (CompressionUtils.isZip(s3Coords.path)) {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
outDir,
S3Utils.S3RETRY,
true
);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
return result;
}
if (CompressionUtils.isGz(s3Coords.path)) {
final String fname = Paths.get(uri).getFileName().toString();
final File outFile = new File(outDir, CompressionUtils.getGzBaseName(fname));
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath());
return result;
}
throw new IAE("Do not know how to load file type at [%s]", uri.toString());
}
catch (Exception e) {
try {
FileUtils.deleteDirectory(outDir);
org.apache.commons.io.FileUtils.deleteDirectory(outDir);
}
catch (IOException ioe) {
log.warn(
ioe,
"Failed to remove output directory for segment[%s] after exception: %s",
segment.getIdentifier(),
outDir
"Failed to remove output directory [%s] for segment pulled from [%s]",
outDir.getAbsolutePath(),
s3Coords.toString()
);
}
throw new SegmentLoadingException(e, e.getMessage());
}
}
public static URI checkURI(URI uri)
{
if (uri.getScheme().equalsIgnoreCase(scheme)) {
uri = URI.create("s3" + uri.toString().substring(scheme.length()));
} else if (!uri.getScheme().equalsIgnoreCase("s3")) {
throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString());
}
return uri;
}
@Override
public InputStream getInputStream(URI uri) throws IOException
{
try {
return buildFileObject(uri, s3Client).openInputStream();
}
catch (ServiceException e) {
throw new IOException(String.format("Could not load URI [%s]", uri.toString()), e);
}
}
@Override
public Predicate<Throwable> shouldRetryPredicate()
{
// Yay! smart retries!
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
if (e == null) {
return false;
}
if (e instanceof ServiceException) {
return S3Utils.isServiceExceptionRecoverable((ServiceException) e);
}
if (S3Utils.S3RETRY.apply(e)) {
return true;
}
// Look all the way down the cause chain, just in case something wraps it deep.
return apply(e.getCause());
}
};
}
/**
* Returns the "version" (aka last modified timestamp) of the URI
*
* @param uri The URI to check the last timestamp
*
* @return The time in ms of the last modification of the URI in String format
*
* @throws IOException
*/
@Override
public String getVersion(URI uri) throws IOException
{
try {
return String.format("%d", buildFileObject(uri, s3Client).getLastModified());
}
catch (S3ServiceException e) {
if (S3Utils.isServiceExceptionRecoverable(e)) {
// The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable
throw new IOException(
String.format("Could not fetch last modified timestamp from URI [%s]", uri.toString()),
e
);
} else {
throw Throwables.propagate(e);
}
}
}
private String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
@ -165,7 +329,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
}
private static class S3Coords
protected static class S3Coords
{
String bucket;
String path;
@ -180,6 +344,12 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
}
public S3Coords(String bucket, String key)
{
this.bucket = bucket;
this.path = key;
}
public String toString()
{
return String.format("s3://%s/%s", bucket, path);

View File

@ -23,11 +23,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;

View File

@ -0,0 +1,90 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.elasticbeanstalk.model.S3LocationNotInServiceRegionException;
import com.amazonaws.services.s3.AmazonS3URI;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.metamx.common.CompressionUtils;
import com.metamx.common.FileUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import javax.swing.text.Segment;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
@JsonTypeName(S3StorageDruidModule.SCHEME)
public class S3LoadSpec implements LoadSpec
{
@JsonProperty(S3DataSegmentPuller.BUCKET)
private final String bucket;
@JsonProperty(S3DataSegmentPuller.KEY)
private final String key;
private final S3DataSegmentPuller puller;
@JsonCreator
public S3LoadSpec(
@JacksonInject S3DataSegmentPuller puller,
@JsonProperty(S3DataSegmentPuller.BUCKET) String bucket,
@JsonProperty(S3DataSegmentPuller.KEY) String key
)
{
Preconditions.checkNotNull(bucket);
Preconditions.checkNotNull(key);
this.bucket = bucket;
this.key = key;
this.puller = puller;
}
@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size());
}
}

View File

@ -18,6 +18,7 @@
package io.druid.storage.s3;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
@ -37,10 +38,32 @@ import java.util.List;
*/
public class S3StorageDruidModule implements DruidModule
{
public static final String SCHEME = "s3_zip";
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "DruidS3-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(S3LoadSpec.class);
}
}
);
}
@Override
@ -48,10 +71,10 @@ public class S3StorageDruidModule implements DruidModule
{
JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class);
Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class);
Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(S3DataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);

View File

@ -19,6 +19,7 @@ package io.druid.storage.s3;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.metamx.common.FileUtils;
import com.metamx.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
@ -51,30 +52,38 @@ public class S3Utils
}
}
public static boolean isServiceExceptionRecoverable(ServiceException ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(((ServiceException) ex).getErrorCode());
return isIOException || isTimeout;
}
public static final Predicate<Throwable> S3RETRY = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
if (e == null) {
return false;
} else if (e instanceof IOException) {
return true;
} else if (e instanceof ServiceException) {
return isServiceExceptionRecoverable((ServiceException) e);
} else {
return apply(e.getCause());
}
}
};
/**
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Callable<T> f) throws Exception
{
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
if (e instanceof IOException) {
return true;
} else if (e instanceof ServiceException) {
final boolean isIOException = e.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode());
return isIOException || isTimeout;
} else {
return false;
}
}
};
final int maxTries = 10;
return RetryUtils.retry(f, shouldRetry, maxTries);
return RetryUtils.retry(f, S3RETRY, maxTries);
}
public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey)

View File

@ -18,7 +18,7 @@
package io.druid.indexing.common;
import com.google.inject.Inject;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
@ -30,11 +30,11 @@ import java.util.Arrays;
*/
public class SegmentLoaderFactory
{
private final OmniSegmentLoader loader;
private final SegmentLoaderLocalCacheManager loader;
@Inject
public SegmentLoaderFactory(
OmniSegmentLoader loader
SegmentLoaderLocalCacheManager loader
)
{
this.loader = loader;

View File

@ -31,7 +31,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.SegmentLoaderConfig;
@ -68,7 +68,7 @@ public class TaskToolboxTest
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
private ObjectMapper ObjectMapper = new ObjectMapper();
private OmniSegmentLoader mockOmniSegmentLoader = EasyMock.createMock(OmniSegmentLoader.class);
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
private Task task = EasyMock.createMock(Task.class);
@Rule
@ -93,7 +93,7 @@ public class TaskToolboxTest
mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockMonitorScheduler,
new SegmentLoaderFactory(mockOmniSegmentLoader),
new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
ObjectMapper
);
}
@ -144,11 +144,11 @@ public class TaskToolboxTest
public void testFetchSegments() throws SegmentLoadingException, IOException
{
File expectedFile = temporaryFolder.newFile();
EasyMock.expect(mockOmniSegmentLoader.getSegmentFiles((DataSegment)EasyMock.anyObject()))
EasyMock.expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles((DataSegment)EasyMock.anyObject()))
.andReturn(expectedFile).anyTimes();
EasyMock.expect(mockOmniSegmentLoader.withConfig((SegmentLoaderConfig)EasyMock.anyObject()))
.andReturn(mockOmniSegmentLoader).anyTimes();
EasyMock.replay(mockOmniSegmentLoader);
EasyMock.expect(mockSegmentLoaderLocalCacheManager.withConfig((SegmentLoaderConfig)EasyMock.anyObject()))
.andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
EasyMock.replay(mockSegmentLoaderLocalCacheManager);
DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(new Interval("2012-01-01/P1D")).version("1").size(1).build();
List<DataSegment> segments = ImmutableList.of
(

View File

@ -17,6 +17,11 @@
package io.druid.indexing.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector;
import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -40,6 +45,7 @@ import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
@ -60,11 +66,11 @@ import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.LocalLoadSpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.timeline.DataSegment;
@ -169,6 +175,37 @@ public class IngestSegmentFirehoseFactoryTest
ts,
new TaskActionToolbox(tl, mdc, newMockEmitter())
);
final ObjectMapper objectMapper = new DefaultObjectMapper();
objectMapper.registerModule(
new SimpleModule("testModule").registerSubtypes(LocalLoadSpec.class)
);
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
objectMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
objectMapper.setInjectableValues(
new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(LocalDataSegmentPuller.class);
}
}
)
)
)
);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null),
tac,
@ -224,11 +261,7 @@ public class IngestSegmentFirehoseFactoryTest
null, // query executor service
null, // monitor scheduler
new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@ -237,10 +270,10 @@ public class IngestSegmentFirehoseFactoryTest
{
return Lists.newArrayList();
}
}
}, objectMapper
)
),
new DefaultObjectMapper()
objectMapper
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(

View File

@ -68,10 +68,8 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
@ -314,11 +312,7 @@ public class TaskLifecycleTest
null, // query executor service
null, // monitor scheduler
new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@ -327,7 +321,7 @@ public class TaskLifecycleTest
{
return Lists.newArrayList();
}
}
}, new DefaultObjectMapper()
)
),
new DefaultObjectMapper()

View File

@ -20,7 +20,6 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
@ -36,9 +35,7 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.IndexerZkConfig;
@ -124,11 +121,7 @@ public class WorkerTaskMonitorTest
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0, null),
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@ -138,7 +131,7 @@ public class WorkerTaskMonitorTest
return Lists.newArrayList();
}
}
)
, jsonMapper)
), jsonMapper
),
null

View File

@ -65,10 +65,10 @@
</scm>
<properties>
<metamx.java-util.version>0.26.15</metamx.java-util.version>
<metamx.java-util.version>0.27.0</metamx.java-util.version>
<apache.curator.version>2.7.0</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version>
<druid.api.version>0.3.5</druid.api.version>
<druid.api.version>0.3.6</druid.api.version>
<jackson.version>2.4.4</jackson.version>
<log4j.version>2.2</log4j.version>
<slf4j.version>1.7.10</slf4j.version>

View File

@ -24,4 +24,8 @@ public class SegmentMissingException extends ISE
public SegmentMissingException(String formatText, Object... arguments) {
super(String.format(formatText, arguments));
}
public SegmentMissingException(Throwable cause, String formatText, Object... arguments){
super(cause, formatText, arguments);
}
}

View File

@ -17,26 +17,34 @@
package io.druid.guice;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentKiller;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.LocalLoadSpec;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoader;
import java.util.List;
/**
*/
public class LocalDataStorageDruidModule implements Module
public class LocalDataStorageDruidModule implements DruidModule
{
public static final String SCHEME = "local";
@Override
public void configure(Binder binder)
{
binder.bind(SegmentLoader.class).to(OmniSegmentLoader.class).in(LazySingleton.class);
binder.bind(SegmentLoader.class).to(SegmentLoaderLocalCacheManager.class).in(LazySingleton.class);
bindDeepStorageLocal(binder);
@ -48,14 +56,14 @@ public class LocalDataStorageDruidModule implements Module
private static void bindDeepStorageLocal(Binder binder)
{
Binders.dataSegmentPullerBinder(binder)
.addBinding("local")
.to(LocalDataSegmentPuller.class)
.in(LazySingleton.class);
.addBinding(SCHEME)
.to(LocalDataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class))
.addBinding("local")
.to(LocalDataSegmentKiller.class)
.in(LazySingleton.class);
.addBinding(SCHEME)
.to(LocalDataSegmentKiller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("local")
@ -63,4 +71,31 @@ public class LocalDataStorageDruidModule implements Module
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of(
new com.fasterxml.jackson.databind.Module()
{
@Override
public String getModuleName()
{
return "DruidLocalStorage-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(LocalLoadSpec.class);
}
}
);
}
}

View File

@ -17,58 +17,207 @@
package io.druid.segment.loading;
import com.google.common.base.Predicate;
import com.google.common.io.Files;
import com.metamx.common.CompressionUtils;
import com.metamx.common.FileUtils;
import com.metamx.common.MapUtils;
import com.metamx.common.UOE;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import javax.tools.FileObject;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
/**
*/
public class LocalDataSegmentPuller implements DataSegmentPuller
public class LocalDataSegmentPuller implements DataSegmentPuller, URIDataPuller
{
public static FileObject buildFileObject(final URI uri)
{
final Path path = Paths.get(uri);
final File file = path.toFile();
return new FileObject()
{
@Override
public URI toUri()
{
return uri;
}
@Override
public String getName()
{
return path.getFileName().toString();
}
@Override
public InputStream openInputStream() throws IOException
{
return new FileInputStream(file);
}
@Override
public OutputStream openOutputStream() throws IOException
{
return new FileOutputStream(file);
}
@Override
public Reader openReader(boolean ignoreEncodingErrors) throws IOException
{
return new FileReader(file);
}
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException
{
throw new UOE("CharSequence not supported");
}
@Override
public Writer openWriter() throws IOException
{
return new FileWriter(file);
}
@Override
public long getLastModified()
{
return file.lastModified();
}
@Override
public boolean delete()
{
return file.delete();
}
};
}
private static final Logger log = new Logger(LocalDataSegmentPuller.class);
@Override
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
final File path = getFile(segment);
getSegmentFiles(getFile(segment), dir);
}
if (path.isDirectory()) {
if (path.equals(dir)) {
public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
if (sourceFile.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return;
return new FileUtils.FileCopyResult(sourceFile);
}
log.info("Copying files from [%s] to [%s]", path, dir);
File file = null;
try {
final File[] files = path.listFiles();
for (int i = 0; i < files.length; ++i) {
file = files[i];
Files.copy(file, new File(dir, file.getName()));
final File[] files = sourceFile.listFiles();
if (files == null) {
throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath());
}
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile);
for (final File oldFile : files) {
if (oldFile.isDirectory()) {
log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath());
continue;
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to copy file[%s].", file);
}
} else {
if (!path.getName().endsWith(".zip")) {
throw new SegmentLoadingException("File is not a zip file[%s]", path);
}
log.info("Unzipping local file[%s] to [%s]", path, dir);
result.addFiles(
FileUtils.retryCopy(
Files.asByteSource(oldFile),
new File(dir, oldFile.getName()),
shouldRetryPredicate(),
10
).getFiles()
);
}
log.info(
"Coppied %d bytes from [%s] to [%s]",
result.size(),
sourceFile.getAbsolutePath(),
dir.getAbsolutePath()
);
return result;
}
if (CompressionUtils.isZip(sourceFile.getName())) {
try {
CompressionUtils.unzip(path, dir);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
Files.asByteSource(sourceFile),
dir,
shouldRetryPredicate(),
false
);
log.info(
"Unzipped %d bytes from [%s] to [%s]",
result.size(),
sourceFile.getAbsolutePath(),
dir.getAbsolutePath()
);
return result;
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to unzip file[%s]", path);
throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath());
}
}
if (CompressionUtils.isGz(sourceFile.getName())) {
final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName()));
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(
Files.asByteSource(sourceFile),
outFile,
shouldRetryPredicate()
);
log.info(
"Gunzipped %d bytes from [%s] to [%s]",
result.size(),
sourceFile.getAbsolutePath(),
outFile.getAbsolutePath()
);
return result;
}
throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath());
}
@Override
public InputStream getInputStream(URI uri) throws IOException
{
return buildFileObject(uri).openInputStream();
}
/**
* Returns the "version" (aka last modified timestamp) of the URI of interest
*
* @param uri The URI to check the last modified timestamp
*
* @return The last modified timestamp in ms of the URI in String format
*/
@Override
public String getVersion(URI uri)
{
return String.format("%d", buildFileObject(uri).getLastModified());
}
@Override
public Predicate<Throwable> shouldRetryPredicate()
{
// It would be nice if there were better logic for smarter retries. For example: If the error is that the file is
// not found, there's only so much that retries would do (unless the file was temporarily absent for some reason).
// Since this is not a commonly used puller in production, and in general is more useful in testing/debugging,
// I do not have a good sense of what kind of Exceptions people would expect to encounter in the wild
return FileUtils.IS_EXCEPTION;
}
private File getFile(DataSegment segment) throws SegmentLoadingException

View File

@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;

View File

@ -0,0 +1,64 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.loading;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.api.client.util.Preconditions;
import io.druid.guice.LocalDataStorageDruidModule;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
/**
*
*/
@JsonTypeName(LocalDataStorageDruidModule.SCHEME)
public class LocalLoadSpec implements LoadSpec
{
private final Path path;
private final LocalDataSegmentPuller puller;
@JsonCreator
public LocalLoadSpec(
@JacksonInject LocalDataSegmentPuller puller,
@JsonProperty(value = "path", required = true) final String path
)
{
Preconditions.checkNotNull(path);
this.path = Paths.get(path);
Preconditions.checkArgument(Files.exists(Paths.get(path)), "[%s] does not exist", path);
this.puller = puller;
}
@JsonProperty
public String getPath()
{
return path.toString();
}
@Override
public LoadSpecResult loadSegment(final File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(path.toFile(), outDir).size());
}
}

View File

@ -17,11 +17,12 @@
package io.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.guice.annotations.Json;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -32,32 +33,31 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class OmniSegmentLoader implements SegmentLoader
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final Logger log = new Logger(OmniSegmentLoader.class);
private static final Logger log = new Logger(SegmentLoaderLocalCacheManager.class);
private final Map<String, DataSegmentPuller> pullers;
private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;
private final List<StorageLocation> locations;
private final Object lock = new Object();
@Inject
public OmniSegmentLoader(
Map<String, DataSegmentPuller> pullers,
public SegmentLoaderLocalCacheManager(
QueryableIndexFactory factory,
SegmentLoaderConfig config
SegmentLoaderConfig config,
@Json ObjectMapper mapper
)
{
this.pullers = pullers;
this.factory = factory;
this.config = config;
this.jsonMapper = mapper;
this.locations = Lists.newArrayList();
for (StorageLocationConfig locationConfig : config.getLocations()) {
@ -65,9 +65,9 @@ public class OmniSegmentLoader implements SegmentLoader
}
}
public OmniSegmentLoader withConfig(SegmentLoaderConfig config)
public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config)
{
return new OmniSegmentLoader(pullers, factory, config);
return new SegmentLoaderLocalCacheManager(factory, config, jsonMapper);
}
@Override
@ -127,22 +127,26 @@ public class OmniSegmentLoader implements SegmentLoader
log.debug("Unable to make parent file[%s]", storageDir);
}
try {
downloadStartMarker.createNewFile();
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
}
catch (IOException e) {
throw new SegmentLoadingException("Unable to create marker file for [%s]", storageDir);
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
}
}
getPuller(segment.getLoadSpec()).getSegmentFiles(segment, storageDir);
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if(result.getSize() != segment.getSize()){
log.warn("Segment [%s] is different than expected size. Expected [%d] found [%d]", segment.getIdentifier(), segment.getSize(), result.getSize());
}
if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
loc.addSegment(segment);
retVal = storageDir;
} else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
@ -179,18 +183,6 @@ public class OmniSegmentLoader implements SegmentLoader
}
}
private DataSegmentPuller getPuller(Map<String, Object> loadSpec) throws SegmentLoadingException
{
String type = MapUtils.getString(loadSpec, "type");
DataSegmentPuller loader = pullers.get(type);
if (loader == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, pullers.keySet());
}
return loader;
}
public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException
{
if (cacheFile.equals(baseFile)) {

View File

@ -0,0 +1,108 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector;
import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.IAE;
import io.druid.guice.GuiceInjectors;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Collection;
/**
*
*/
@RunWith(Parameterized.class)
public class LoadSpecTest
{
@Parameterized.Parameters
public static Collection<Object[]> getParameters()
{
return ImmutableList.<Object[]>of(
new Object[]{"{\"path\":\"/\",\"type\":\"local\"}", "local"}
);
}
private final String value;
private final String expectedId;
public LoadSpecTest(String value, String expectedId)
{
this.value = value;
this.expectedId = expectedId;
}
private static ObjectMapper mapper;
@BeforeClass
public static void setUp()
{
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(LocalDataSegmentPuller.class);
}
}
)
);
mapper = new DefaultObjectMapper();
mapper.registerModule( new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class));
mapper.setInjectableValues(new GuiceInjectableValues(injector));
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
mapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
}
@Test
public void testStringResolve() throws IOException
{
LoadSpec loadSpec = mapper.readValue(value, LoadSpec.class);
Assert.assertEquals(expectedId, loadSpec.getClass().getAnnotation(JsonTypeName.class).value());
}
}

View File

@ -0,0 +1,152 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.loading;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.CompressionUtils;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.zip.GZIPOutputStream;
/**
*
*/
public class LocalDataSegmentPullerTest
{
private File tmpDir;
private LocalDataSegmentPuller puller;
@Before
public void setup()
{
tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
puller = new LocalDataSegmentPuller();
}
@After
public void tearDown() throws IOException
{
deleteFiles(tmpDir);
}
public static void deleteFiles(File... files) throws IOException
{
IOException ex = null;
for (File file : files) {
if (file == null || !file.exists()) {
continue;
}
if (!file.delete()) {
IOException e = new IOException("Could not delete " + file.getAbsolutePath());
if (ex == null) {
ex = e;
} else {
ex.addSuppressed(e);
}
}
}
if (ex != null) {
throw ex;
}
}
@Test
public void simpleZipTest() throws IOException, SegmentLoadingException
{
File file = new File(tmpDir, "test1data");
File zipFile = File.createTempFile("ziptest", ".zip");
file.deleteOnExit();
zipFile.deleteOnExit();
zipFile.delete();
try {
try (OutputStream outputStream = new FileOutputStream(file)) {
outputStream.write(new byte[0]);
outputStream.flush();
}
CompressionUtils.zip(tmpDir, zipFile);
file.delete();
Assert.assertFalse(file.exists());
Assert.assertTrue(zipFile.exists());
puller.getSegmentFiles(zipFile, tmpDir);
Assert.assertTrue(file.exists());
}
finally {
deleteFiles(file, zipFile);
}
}
@Test
public void simpleGZTest() throws IOException, SegmentLoadingException
{
File zipFile = File.createTempFile("gztest", ".gz");
File unZipFile = new File(
tmpDir,
Files.getNameWithoutExtension(
zipFile.getAbsolutePath()
)
);
unZipFile.delete();
zipFile.deleteOnExit();
zipFile.delete();
try {
try (OutputStream fOutStream = new FileOutputStream(zipFile)) {
try (OutputStream outputStream = new GZIPOutputStream(fOutStream)) {
outputStream.write(new byte[0]);
outputStream.flush();
}
}
Assert.assertTrue(zipFile.exists());
Assert.assertFalse(unZipFile.exists());
puller.getSegmentFiles(zipFile, tmpDir);
Assert.assertTrue(unZipFile.exists());
}finally{
deleteFiles(zipFile, unZipFile);
}
}
@Test
public void simpleDirectoryTest() throws IOException, SegmentLoadingException
{
File srcDir = Files.createTempDir();
File tmpFile = File.createTempFile("test", "file", srcDir);
File expectedOutput = new File(tmpDir, Files.getNameWithoutExtension(tmpFile.getAbsolutePath()));
expectedOutput.delete();
try{
Assert.assertFalse(expectedOutput.exists());
puller.getSegmentFiles(srcDir, tmpDir);
Assert.assertTrue(expectedOutput.exists());
}finally{
deleteFiles(expectedOutput, tmpFile, srcDir);
}
}
}