allow hadoop druid indexer to register registererers

This commit is contained in:
Fangjin Yang 2012-11-05 16:13:50 -08:00
parent a356447767
commit 7b2522ff3f
3 changed files with 51 additions and 11 deletions

View File

@ -27,10 +27,12 @@ import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.indexer.data.DataSpec;
import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
@ -50,6 +52,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
@ -74,7 +77,10 @@ public class HadoopDruidIndexerConfig
jsonMapper.configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true);
}
public static enum IndexJobCounters { INVALID_ROW_COUNTER }
public static enum IndexJobCounters
{
INVALID_ROW_COUNTER
}
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
@ -100,6 +106,7 @@ public class HadoopDruidIndexerConfig
private volatile DataRollupSpec rollupSpec;
private volatile UpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows = false;
private volatile List<Registererer> registererers;
public List<Interval> getIntervals()
{
@ -114,7 +121,7 @@ public class HadoopDruidIndexerConfig
// For backwards compatibility
this.intervals = intervals;
if(this.segmentGranularity != null) {
if (this.segmentGranularity != null) {
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals);
}
}
@ -171,7 +178,7 @@ public class HadoopDruidIndexerConfig
// For backwards compatibility
this.segmentGranularity = segmentGranularity;
if(this.intervals != null) {
if (this.intervals != null) {
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, this.intervals);
}
}
@ -184,8 +191,11 @@ public class HadoopDruidIndexerConfig
public void setGranularitySpec(GranularitySpec granularitySpec)
{
Preconditions.checkState(this.intervals == null, "Use setGranularitySpec instead of setIntervals");
Preconditions.checkState(this.segmentGranularity == null, "Use setGranularitySpec instead of setSegmentGranularity");
Preconditions.checkState(this.intervals == null, "Use setGranularitySpec instead of setIntervals");
Preconditions.checkState(
this.segmentGranularity == null,
"Use setGranularitySpec instead of setSegmentGranularity"
);
this.granularitySpec = granularitySpec;
}
@ -343,9 +353,34 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/
@JsonProperty
public List<Registererer> getRegistererers()
{
return registererers;
}
public void setRegistererers(List<String> registererers)
{
this.registererers = Lists.transform(
registererers,
new Function<String, Registererer>()
{
@Override
public Registererer apply(@Nullable String input)
{
try {
return (Registererer) Class.forName(input).newInstance();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/
/**
* Get the proper bucket for this "row"

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
@ -43,7 +42,7 @@ import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
@ -271,6 +270,10 @@ public class IndexGeneratorJob implements Jobby
}
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
parser = config.getDataSpec().getParser();
for (Registererer registererer : config.getRegistererers()) {
registererer.register();
}
}
@Override

View File

@ -195,7 +195,9 @@ public class RemoteTaskRunner implements TaskRunner
)
);
currentlyTerminating.addAll(terminated.getNodeIds());
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
}
} else {
log.info(
"[%s] still terminating. Wait for all nodes to terminate before trying again.",