From 68e5adde33b7226f515eeb1a39940399cb9b5500 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Tue, 6 Nov 2012 11:49:17 -0800 Subject: [PATCH] register registererers in the config --- .../druid/index/v1/serde/Registererer.java | 3 - .../indexer/HadoopDruidIndexerConfig.java | 136 +++++++++++++----- .../druid/indexer/HadoopDruidIndexerMain.java | 4 +- .../druid/indexer/HadoopDruidIndexerNode.java | 72 ++-------- .../druid/indexer/IndexGeneratorJob.java | 8 -- 5 files changed, 113 insertions(+), 110 deletions(-) diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java index df83ceb33d9..f560dfdc1e6 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java @@ -19,8 +19,6 @@ package com.metamx.druid.index.v1.serde; -import org.codehaus.jackson.annotate.JsonTypeInfo; - /** * This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of * another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface @@ -29,7 +27,6 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; * * The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult. */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public interface Registererer { public void register(); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index c93a4ebe671..f4d3d75c0df 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.MapUtils; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.index.v1.serde.Registererer; @@ -48,11 +49,13 @@ import org.apache.hadoop.mapreduce.Job; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Collections; @@ -82,16 +85,90 @@ public class HadoopDruidIndexerConfig INVALID_ROW_COUNTER } + public static HadoopDruidIndexerConfig fromMap(Map argSpec) + { + if (argSpec.containsKey("registerers")) { + List registererers = Lists.transform( + MapUtils.getList(argSpec, "registerers"), + new Function() + { + @Override + public Registererer apply(@Nullable Object input) + { + try { + return (Registererer) Class.forName((String) input).newInstance(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + for (Registererer registererer : registererers) { + registererer.register(); + } + } + + final HadoopDruidIndexerConfig retVal = jsonMapper.convertValue( + argSpec, new TypeReference>() + { + } + ); + retVal.verify(); + return retVal; + } + + @SuppressWarnings("unchecked") + public static HadoopDruidIndexerConfig fromFile(File file) + { + try { + return fromMap( + (Map) jsonMapper.readValue( + file, new TypeReference>() + { + } + ) + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @SuppressWarnings("unchecked") + public static HadoopDruidIndexerConfig fromString(String str) + { + try { + return fromMap( + (Map) jsonMapper.readValue( + str, new TypeReference>() + { + } + ) + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) + { + return fromString(conf.get(CONFIG_PROPERTY)); + } + private static final Logger log = new Logger(HadoopDruidIndexerConfig.class); private static final String CONFIG_PROPERTY = "druid.indexer.config"; - @Deprecated private volatile List intervals; + @Deprecated + private volatile List intervals; private volatile String dataSource; private volatile String timestampColumnName; private volatile String timestampFormat; private volatile DataSpec dataSpec; - @Deprecated private volatile Granularity segmentGranularity; + @Deprecated + private volatile Granularity segmentGranularity; private volatile GranularitySpec granularitySpec; private volatile PathSpec pathSpec; private volatile String jobOutputDir; @@ -106,7 +183,6 @@ public class HadoopDruidIndexerConfig private volatile DataRollupSpec rollupSpec; private volatile UpdaterJobSpec updaterJobSpec; private volatile boolean ignoreInvalidRows = false; - private volatile List registererers; public List getIntervals() { @@ -353,16 +429,6 @@ public class HadoopDruidIndexerConfig this.ignoreInvalidRows = ignoreInvalidRows; } - @JsonProperty - public List getRegistererers() - { - return registererers; - } - - public void setRegistererers(List registererers) - { - this.registererers = registererers; - } /******************************************** Granularity/Bucket Helper Methods ********************************************/ @@ -376,7 +442,13 @@ public class HadoopDruidIndexerConfig */ public Optional getBucket(Map theMap) { - final Optional timeBucket = getGranularitySpec().bucketInterval(new DateTime(theMap.get(getTimestampColumnName()))); + final Optional timeBucket = getGranularitySpec().bucketInterval( + new DateTime( + theMap.get( + getTimestampColumnName() + ) + ) + ); if (!timeBucket.isPresent()) { return Optional.absent(); } @@ -389,7 +461,13 @@ public class HadoopDruidIndexerConfig for (final HadoopyShardSpec hadoopyShardSpec : shards) { final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec(); if (actualSpec.isInChunk(theMap)) { - return Optional.of(new Bucket(hadoopyShardSpec.getShardNum(), timeBucket.get().getStart(), actualSpec.getPartitionNum())); + return Optional.of( + new Bucket( + hadoopyShardSpec.getShardNum(), + timeBucket.get().getStart(), + actualSpec.getPartitionNum() + ) + ); } } @@ -460,12 +538,14 @@ public class HadoopDruidIndexerConfig { final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); - return new Path(String.format( - "%s/%s_%s/partitions.json", - makeIntermediatePath(), - ISODateTimeFormat.basicDateTime().print(bucketInterval.getStart()), - ISODateTimeFormat.basicDateTime().print(bucketInterval.getEnd()) - )); + return new Path( + String.format( + "%s/%s_%s/partitions.json", + makeIntermediatePath(), + ISODateTimeFormat.basicDateTime().print(bucketInterval.getStart()), + ISODateTimeFormat.basicDateTime().print(bucketInterval.getEnd()) + ) + ); } public Path makeSegmentOutputPath(Bucket bucket) @@ -501,20 +581,6 @@ public class HadoopDruidIndexerConfig } } - public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) - { - try { - final HadoopDruidIndexerConfig retVal = jsonMapper.readValue( - conf.get(CONFIG_PROPERTY), HadoopDruidIndexerConfig.class - ); - retVal.verify(); - return retVal; - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - public void verify() { try { diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMain.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMain.java index bb5cc767fef..ce821fde9ee 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMain.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerMain.java @@ -37,8 +37,8 @@ public class HadoopDruidIndexerMain HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build(); - node.setIntervalSpec(args.length == 1 ? null : args[0]); - node.setArgumentSpec(args[args.length == 1 ? 0 : 1]); + node.setIntervalSpec(args.length == 1 ? null : args[0]) + .setArgumentSpec(args[args.length == 1 ? 0 : 1]); Lifecycle lifecycle = new Lifecycle(); lifecycle.addManagedInstance(node); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java index 32efc183e7d..0b611c82b92 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java @@ -1,34 +1,23 @@ package com.metamx.druid.indexer; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.metamx.common.ISE; import com.metamx.common.Pair; -import com.metamx.common.config.Config; -import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.jackson.DefaultObjectMapper; -import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.jsontype.NamedType; -import org.codehaus.jackson.smile.SmileFactory; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.skife.config.ConfigurationObjectFactory; import java.io.File; import java.util.Arrays; import java.util.List; -import java.util.Properties; /** */ public class HadoopDruidIndexerNode { - private static final Logger log = new Logger(HadoopDruidIndexerNode.class); - private static final List> expectedFields = ImmutableList.>builder() .add(Pair.of("dataSource", "Name of dataSource")) @@ -99,16 +88,9 @@ public class HadoopDruidIndexerNode return new Builder(); } - private final ObjectMapper jsonMapper; - private String intervalSpec = null; private String argumentSpec = null; - public HadoopDruidIndexerNode(ObjectMapper jsonMapper) - { - this.jsonMapper = jsonMapper; - } - public String getIntervalSpec() { return intervalSpec; @@ -134,25 +116,27 @@ public class HadoopDruidIndexerNode @SuppressWarnings("unchecked") public HadoopDruidIndexerNode registerJacksonSubtype(Class... clazzes) { - jsonMapper.registerSubtypes(clazzes); + HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(clazzes); return this; } @SuppressWarnings("unchecked") public HadoopDruidIndexerNode registerJacksonSubtype(NamedType... namedTypes) { - jsonMapper.registerSubtypes(namedTypes); + HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(namedTypes); return this; } @LifecycleStart - public synchronized void start() throws Exception + public void start() throws Exception { + Preconditions.checkNotNull(argumentSpec, "argumentSpec"); + final HadoopDruidIndexerConfig config; if (argumentSpec.startsWith("{")) { - config = jsonMapper.readValue(argumentSpec, HadoopDruidIndexerConfig.class); + config = HadoopDruidIndexerConfig.fromString(argumentSpec); } else { - config = jsonMapper.readValue(new File(argumentSpec), HadoopDruidIndexerConfig.class); + config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); } final List dataInterval; @@ -170,51 +154,15 @@ public class HadoopDruidIndexerNode } @LifecycleStop - public synchronized void stop() + public void stop() { } public static class Builder { - private ObjectMapper jsonMapper = null; - private ObjectMapper smileMapper = null; - private Lifecycle lifecycle = null; - private Properties props = null; - private ConfigurationObjectFactory configFactory = null; - - public Builder withMapper(ObjectMapper jsonMapper) - { - this.jsonMapper = jsonMapper; - return this; - } - public HadoopDruidIndexerNode build() { - if (jsonMapper == null && smileMapper == null) { - jsonMapper = new DefaultObjectMapper(); - smileMapper = new DefaultObjectMapper(new SmileFactory()); - smileMapper.getJsonFactory().setCodec(smileMapper); - } else if (jsonMapper == null || smileMapper == null) { - throw new ISE( - "Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", - jsonMapper, - smileMapper - ); - } - - if (lifecycle == null) { - lifecycle = new Lifecycle(); - } - - if (props == null) { - props = Initialization.loadProperties(); - } - - if (configFactory == null) { - configFactory = Config.createFactory(props); - } - - return new HadoopDruidIndexerNode(jsonMapper); + return new HadoopDruidIndexerNode(); } } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 73930562952..3674b3059ce 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -174,10 +174,6 @@ public class IndexGeneratorJob implements Jobby config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); parser = config.getDataSpec().getParser(); timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); - - for (Registererer registererer : config.getRegistererers()) { - registererer.register(); - } } @Override @@ -274,10 +270,6 @@ public class IndexGeneratorJob implements Jobby } timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat()); parser = config.getDataSpec().getParser(); - - for (Registererer registererer : config.getRegistererers()) { - registererer.register(); - } } @Override