register registererers in the config

This commit is contained in:
Fangjin Yang 2012-11-06 11:49:17 -08:00
parent eb2b5a61fa
commit 68e5adde33
5 changed files with 113 additions and 110 deletions

View File

@ -19,8 +19,6 @@
package com.metamx.druid.index.v1.serde;
import org.codehaus.jackson.annotate.JsonTypeInfo;
/**
* This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
@ -29,7 +27,6 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
*
* The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface Registererer
{
public void register();

View File

@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.v1.serde.Registererer;
@ -48,11 +49,13 @@ import org.apache.hadoop.mapreduce.Job;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
@ -82,16 +85,90 @@ public class HadoopDruidIndexerConfig
INVALID_ROW_COUNTER
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
if (argSpec.containsKey("registerers")) {
List<Registererer> registererers = Lists.transform(
MapUtils.getList(argSpec, "registerers"),
new Function<Object, Registererer>()
{
@Override
public Registererer apply(@Nullable Object input)
{
try {
return (Registererer) Class.forName((String) input).newInstance();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
for (Registererer registererer : registererers) {
registererer.register();
}
}
final HadoopDruidIndexerConfig retVal = jsonMapper.convertValue(
argSpec, new TypeReference<Map<String, Object>>()
{
}
);
retVal.verify();
return retVal;
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromFile(File file)
{
try {
return fromMap(
(Map<String, Object>) jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromString(String str)
{
try {
return fromMap(
(Map<String, Object>) jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
return fromString(conf.get(CONFIG_PROPERTY));
}
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
private static final String CONFIG_PROPERTY = "druid.indexer.config";
@Deprecated private volatile List<Interval> intervals;
@Deprecated
private volatile List<Interval> intervals;
private volatile String dataSource;
private volatile String timestampColumnName;
private volatile String timestampFormat;
private volatile DataSpec dataSpec;
@Deprecated private volatile Granularity segmentGranularity;
@Deprecated
private volatile Granularity segmentGranularity;
private volatile GranularitySpec granularitySpec;
private volatile PathSpec pathSpec;
private volatile String jobOutputDir;
@ -106,7 +183,6 @@ public class HadoopDruidIndexerConfig
private volatile DataRollupSpec rollupSpec;
private volatile UpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows = false;
private volatile List<Registererer> registererers;
public List<Interval> getIntervals()
{
@ -353,16 +429,6 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public List<Registererer> getRegistererers()
{
return registererers;
}
public void setRegistererers(List<Registererer> registererers)
{
this.registererers = registererers;
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/
@ -376,7 +442,13 @@ public class HadoopDruidIndexerConfig
*/
public Optional<Bucket> getBucket(Map<String, String> theMap)
{
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(new DateTime(theMap.get(getTimestampColumnName())));
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(
new DateTime(
theMap.get(
getTimestampColumnName()
)
)
);
if (!timeBucket.isPresent()) {
return Optional.absent();
}
@ -389,7 +461,13 @@ public class HadoopDruidIndexerConfig
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
if (actualSpec.isInChunk(theMap)) {
return Optional.of(new Bucket(hadoopyShardSpec.getShardNum(), timeBucket.get().getStart(), actualSpec.getPartitionNum()));
return Optional.of(
new Bucket(
hadoopyShardSpec.getShardNum(),
timeBucket.get().getStart(),
actualSpec.getPartitionNum()
)
);
}
}
@ -460,12 +538,14 @@ public class HadoopDruidIndexerConfig
{
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
return new Path(String.format(
"%s/%s_%s/partitions.json",
makeIntermediatePath(),
ISODateTimeFormat.basicDateTime().print(bucketInterval.getStart()),
ISODateTimeFormat.basicDateTime().print(bucketInterval.getEnd())
));
return new Path(
String.format(
"%s/%s_%s/partitions.json",
makeIntermediatePath(),
ISODateTimeFormat.basicDateTime().print(bucketInterval.getStart()),
ISODateTimeFormat.basicDateTime().print(bucketInterval.getEnd())
)
);
}
public Path makeSegmentOutputPath(Bucket bucket)
@ -501,20 +581,6 @@ public class HadoopDruidIndexerConfig
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
try {
final HadoopDruidIndexerConfig retVal = jsonMapper.readValue(
conf.get(CONFIG_PROPERTY), HadoopDruidIndexerConfig.class
);
retVal.verify();
return retVal;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public void verify()
{
try {

View File

@ -37,8 +37,8 @@ public class HadoopDruidIndexerMain
HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build();
node.setIntervalSpec(args.length == 1 ? null : args[0]);
node.setArgumentSpec(args[args.length == 1 ? 0 : 1]);
node.setIntervalSpec(args.length == 1 ? null : args[0])
.setArgumentSpec(args[args.length == 1 ? 0 : 1]);
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(node);

View File

@ -1,34 +1,23 @@
package com.metamx.druid.indexer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.codehaus.jackson.smile.SmileFactory;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.config.ConfigurationObjectFactory;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
*/
public class HadoopDruidIndexerNode
{
private static final Logger log = new Logger(HadoopDruidIndexerNode.class);
private static final List<Pair<String, String>> expectedFields =
ImmutableList.<Pair<String, String>>builder()
.add(Pair.of("dataSource", "Name of dataSource"))
@ -99,16 +88,9 @@ public class HadoopDruidIndexerNode
return new Builder();
}
private final ObjectMapper jsonMapper;
private String intervalSpec = null;
private String argumentSpec = null;
public HadoopDruidIndexerNode(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
public String getIntervalSpec()
{
return intervalSpec;
@ -134,25 +116,27 @@ public class HadoopDruidIndexerNode
@SuppressWarnings("unchecked")
public HadoopDruidIndexerNode registerJacksonSubtype(Class<?>... clazzes)
{
jsonMapper.registerSubtypes(clazzes);
HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(clazzes);
return this;
}
@SuppressWarnings("unchecked")
public HadoopDruidIndexerNode registerJacksonSubtype(NamedType... namedTypes)
{
jsonMapper.registerSubtypes(namedTypes);
HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(namedTypes);
return this;
}
@LifecycleStart
public synchronized void start() throws Exception
public void start() throws Exception
{
Preconditions.checkNotNull(argumentSpec, "argumentSpec");
final HadoopDruidIndexerConfig config;
if (argumentSpec.startsWith("{")) {
config = jsonMapper.readValue(argumentSpec, HadoopDruidIndexerConfig.class);
config = HadoopDruidIndexerConfig.fromString(argumentSpec);
} else {
config = jsonMapper.readValue(new File(argumentSpec), HadoopDruidIndexerConfig.class);
config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
}
final List<Interval> dataInterval;
@ -170,51 +154,15 @@ public class HadoopDruidIndexerNode
}
@LifecycleStop
public synchronized void stop()
public void stop()
{
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMapper(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
return this;
}
public HadoopDruidIndexerNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
} else if (jsonMapper == null || smileMapper == null) {
throw new ISE(
"Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.",
jsonMapper,
smileMapper
);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new HadoopDruidIndexerNode(jsonMapper);
return new HadoopDruidIndexerNode();
}
}
}

View File

@ -174,10 +174,6 @@ public class IndexGeneratorJob implements Jobby
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getDataSpec().getParser();
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
for (Registererer registererer : config.getRegistererers()) {
registererer.register();
}
}
@Override
@ -274,10 +270,6 @@ public class IndexGeneratorJob implements Jobby
}
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
parser = config.getDataSpec().getParser();
for (Registererer registererer : config.getRegistererers()) {
registererer.register();
}
}
@Override