Fixes #112, paths specified as "s3://" when given to HadoopDruidIndexerMain will be converted to "s3n://" and loaded via Hadoop's default file system for s3n.

This commit is contained in:
cheddar 2013-04-30 12:58:36 -05:00
parent 60b279b0d3
commit e1367f256b
1 changed files with 21 additions and 1 deletions

View File

@ -22,12 +22,18 @@ package com.metamx.druid.indexer;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -87,6 +93,20 @@ public class HadoopDruidIndexerNode
final HadoopDruidIndexerConfig config; final HadoopDruidIndexerConfig config;
if (argumentSpec.startsWith("{")) { if (argumentSpec.startsWith("{")) {
config = HadoopDruidIndexerConfig.fromString(argumentSpec); config = HadoopDruidIndexerConfig.fromString(argumentSpec);
} else if (argumentSpec.startsWith("s3://")) {
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
String configString = CharStreams.toString(new InputSupplier<Readable>()
{
@Override
public Readable getInput() throws IOException
{
return new InputStreamReader(fs.open(s3nPath));
}
});
config = HadoopDruidIndexerConfig.fromString(configString);
} else { } else {
config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
} }