diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 25672e85cad..75d44a6ede8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -53,6 +53,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -62,6 +63,8 @@ import org.joda.time.format.ISODateTimeFormat; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -171,6 +174,28 @@ public class HadoopDruidIndexerConfig } } + @SuppressWarnings("unchecked") + public static HadoopDruidIndexerConfig fromDistributedFileSystem(String path) + { + try + { + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(new Configuration()); + Reader reader = new InputStreamReader(fs.open(pt)); + + return fromMap( + (Map) HadoopDruidIndexerConfig.jsonMapper.readValue( + reader, new TypeReference>() + { + } + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) { final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY)); diff --git a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java index efc3206d441..03a28f02d13 100644 --- a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java @@ -40,6 +40,8 @@ import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; import io.druid.metadata.MetadataStorageConnectorConfig; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import java.util.Properties; @@ -116,7 +118,26 @@ public class CliInternalHadoopIndexer extends GuiceRunnable if (argumentSpec.startsWith("{")) { config = HadoopDruidIndexerConfig.fromString(argumentSpec); } else { - config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); + File localConfigFile = null; + + try { + final URI argumentSpecUri = new URI(argumentSpec); + final String argumentSpecScheme = argumentSpecUri.getScheme(); + + if (argumentSpecScheme == null || argumentSpecScheme.equals("file")) { + // File URI. + localConfigFile = new File(argumentSpecUri.getPath()); + } + } catch (URISyntaxException e) { + // Not a URI, assume it's a local file. + localConfigFile = new File(argumentSpec); + } + + if (localConfigFile != null) { + config = HadoopDruidIndexerConfig.fromFile(localConfigFile); + } else { + config = HadoopDruidIndexerConfig.fromDistributedFileSystem(argumentSpec); + } } } catch (Exception e) {