Merge pull request #1465 from gianm/hadoop-indexer-config-hdfs

read hadoop-indexer configuration file from HDFS
This commit is contained in:
Charles Allen 2015-07-06 11:25:13 -07:00
commit 66d105940d
2 changed files with 47 additions and 1 deletions

View File

@ -53,6 +53,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup; import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -62,6 +63,8 @@ import org.joda.time.format.ISODateTimeFormat;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -171,6 +174,28 @@ public class HadoopDruidIndexerConfig
} }
} }
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromDistributedFileSystem(String path)
{
try
{
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(new Configuration());
Reader reader = new InputStreamReader(fs.open(pt));
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
reader, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{ {
final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY)); final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY));

View File

@ -40,6 +40,8 @@ import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageConnectorConfig;
import java.io.File; import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
@ -116,7 +118,26 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
if (argumentSpec.startsWith("{")) { if (argumentSpec.startsWith("{")) {
config = HadoopDruidIndexerConfig.fromString(argumentSpec); config = HadoopDruidIndexerConfig.fromString(argumentSpec);
} else { } else {
config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); File localConfigFile = null;
try {
final URI argumentSpecUri = new URI(argumentSpec);
final String argumentSpecScheme = argumentSpecUri.getScheme();
if (argumentSpecScheme == null || argumentSpecScheme.equals("file")) {
// File URI.
localConfigFile = new File(argumentSpecUri.getPath());
}
} catch (URISyntaxException e) {
// Not a URI, assume it's a local file.
localConfigFile = new File(argumentSpec);
}
if (localConfigFile != null) {
config = HadoopDruidIndexerConfig.fromFile(localConfigFile);
} else {
config = HadoopDruidIndexerConfig.fromDistributedFileSystem(argumentSpec);
}
} }
} }
catch (Exception e) { catch (Exception e) {