Merge pull request #1472 from himanshug/combiner

Use Combiner to merge InputRows at the Mapper during Hadoop Batch Ingestion
This commit is contained in:
Fangjin Yang 2015-07-20 13:33:10 -07:00
commit c4ed8fee56
20 changed files with 949 additions and 101 deletions

View File

@ -235,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{

View File

@ -259,7 +259,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
@ -340,7 +340,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
@ -378,7 +378,7 @@ public class DeterminePartitionsJob implements Jobby
}
public void emitDimValueCounts(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
DateTime timestamp,
Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
@ -891,7 +891,7 @@ public class DeterminePartitionsJob implements Jobby
}
private static void write(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
final byte[] groupKey,
DimValueCount dimValueCount
)

View File

@ -41,11 +41,11 @@ import org.joda.time.DateTime;
import com.metamx.common.RE;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
{
private static final Logger log = new Logger(HadoopDruidIndexerMapper.class);
private HadoopDruidIndexerConfig config;
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
@ -70,7 +70,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
@Override
protected void map(
Writable key, Writable value, Context context
Object key, Object value, Context context
) throws IOException, InterruptedException
{
try {
@ -99,7 +99,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
public final static InputRow parseInputRow(Writable value, InputRowParser parser)
public final static InputRow parseInputRow(Object value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
@ -109,7 +109,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
abstract protected void innerMap(InputRow inputRow, Writable value, Context context)
abstract protected void innerMap(InputRow inputRow, Object value, Context context)
throws IOException, InterruptedException;
}

View File

@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
private static final boolean DEFAULT_USE_COMBINER = false;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
@ -61,7 +62,8 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO
DEFAULT_AGG_BUFFER_RATIO,
DEFAULT_USE_COMBINER
);
}
@ -81,6 +83,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
private final boolean useCombiner;
@JsonCreator
public HadoopTuningConfig(
@ -99,7 +102,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio,
final @JsonProperty("useCombiner") Boolean useCombiner
)
{
this.workingPath = workingPath;
@ -120,6 +124,7 @@ public class HadoopTuningConfig implements TuningConfig
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
}
@JsonProperty
@ -216,6 +221,12 @@ public class HadoopTuningConfig implements TuningConfig
return aggregationBufferRatio;
}
@JsonProperty
public boolean getUseCombiner()
{
return useCombiner;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -234,7 +245,8 @@ public class HadoopTuningConfig implements TuningConfig
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
@ -256,7 +268,8 @@ public class HadoopTuningConfig implements TuningConfig
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
@ -278,7 +291,8 @@ public class HadoopTuningConfig implements TuningConfig
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
}

View File

@ -31,11 +31,10 @@ import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
@ -65,11 +64,13 @@ import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -141,7 +142,7 @@ public class IndexGeneratorJob implements Jobby
JobHelper.setInputFormat(job, config);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
SortableBytes.useSortableBytesAsMapOutputKey(job);
@ -149,6 +150,12 @@ public class IndexGeneratorJob implements Jobby
if (numReducers == 0) {
throw new RuntimeException("No buckets?? seems there is no data to index.");
}
if(config.getSchema().getTuningConfig().getUseCombiner()) {
job.setCombinerClass(IndexGeneratorCombiner.class);
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
}
job.setNumReduceTasks(numReducers);
job.setPartitionerClass(IndexGeneratorPartitioner.class);
@ -193,14 +200,54 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Writable>
private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket,
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
boolean isOffHeap,
StupidPool bufferPool
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (isOffHeap) {
return new OffheapIncrementalIndex(
indexSchema,
bufferPool,
true,
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
{
private static final HashFunction hashFunction = Hashing.murmur3_128();
private AggregatorFactory[] aggregators;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
}
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
@ -230,11 +277,133 @@ public class IndexGeneratorJob implements Jobby
.put(hashedDimensions)
.array()
).toBytesWritable(),
value
new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators))
);
}
}
public static class IndexGeneratorCombiner extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>
{
private HadoopDruidIndexerConfig config;
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
}
@Override
protected void reduce(
final BytesWritable key, Iterable<BytesWritable> values, final Context context
) throws IOException, InterruptedException
{
Iterator<BytesWritable> iter = values.iterator();
BytesWritable first = iter.next();
if(iter.hasNext()) {
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));
while(iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);
if(!index.canAppendRow()) {
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, false, null);
}
index.add(value);
}
flushIndexToContextAndClose(key, index, context);
} else {
context.write(key, first);
}
}
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException
{
Iterator<Row> rows = index.iterator();
while(rows.hasNext()) {
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, index.getDimensions());
context.write(
key,
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))
);
}
index.close();
}
private InputRow getInputRowFromRow(final Row row, final List<String> dimensions) {
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return dimensions;
}
@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}
@Override
public DateTime getTimestamp()
{
return row.getTimestamp();
}
@Override
public List<String> getDimension(String dimension)
{
return row.getDimension(dimension);
}
@Override
public Object getRaw(String dimension)
{
return row.getRaw(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return row.getFloatMetric(metric);
}
@Override
public long getLongMetric(String metric)
{
return row.getLongMetric(metric);
}
@Override
public int compareTo(Row o)
{
return row.compareTo(o);
}
};
}
}
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Writable> implements Configurable
{
private Configuration config;
@ -269,11 +438,12 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Writable, BytesWritable, Text>
public static class IndexGeneratorReducer extends Reducer<BytesWritable, BytesWritable, BytesWritable, Text>
{
protected HadoopDruidIndexerConfig config;
private List<String> metricNames = Lists.newArrayList();
private InputRowParser parser;
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
protected ProgressIndicator makeProgressIndicator(final Context context)
{
@ -317,29 +487,35 @@ public class IndexGeneratorJob implements Jobby
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) {
metricNames.add(factory.getName());
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
metricNames.add(aggregators[i].getName());
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
parser = config.getParser();
}
@Override
protected void reduce(
BytesWritable key, Iterable<Writable> values, final Context context
BytesWritable key, Iterable<BytesWritable> values, final Context context
) throws IOException, InterruptedException
{
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize();
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
* config.getSchema().getTuningConfig().getAggregationBufferRatio());
final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool);
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
@ -354,24 +530,13 @@ public class IndexGeneratorJob implements Jobby
Set<String> allDimensionNames = Sets.newHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
for (final Writable value : values) {
for (final BytesWritable bw : values) {
context.progress();
int numRows;
try {
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser));
allDimensionNames.addAll(inputRow.getDimensions());
numRows = index.add(inputRow);
}
catch (ParseException e) {
if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
continue;
} else {
throw e;
}
}
final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
++lineCount;
if (!index.canAppendRow()) {
@ -391,8 +556,14 @@ public class IndexGeneratorJob implements Jobby
persist(index, interval, file, progressIndicator);
// close this index and make a new one, reusing same buffer
index.close();
index = makeIncrementalIndex(bucket, aggs, bufferPool);
index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
);
startTime = System.currentTimeMillis();
++indexCount;
}
@ -421,7 +592,7 @@ public class IndexGeneratorJob implements Jobby
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
);
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
@ -475,31 +646,6 @@ public class IndexGeneratorJob implements Jobby
index.close();
}
}
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
bufferPool,
true,
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}
}
}
public static class IndexGeneratorOutputFormat extends TextOutputFormat

View File

@ -0,0 +1,228 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
*/
public class InputRowSerde
{
private static final Logger log = new Logger(InputRowSerde.class);
private static final Text[] EMPTY_TEXT_ARRAY = new Text[0];
public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs)
{
try {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
//write timestamp
out.writeLong(row.getTimestampFromEpoch());
//writing all dimensions
List<String> dimList = row.getDimensions();
Text[] dims = EMPTY_TEXT_ARRAY;
if(dimList != null) {
dims = new Text[dimList.size()];
for (int i = 0; i < dims.length; i++) {
dims[i] = new Text(dimList.get(i));
}
}
StringArrayWritable sw = new StringArrayWritable(dims);
sw.write(out);
MapWritable mw = new MapWritable();
if(dimList != null) {
for (String dim : dimList) {
List<String> dimValue = row.getDimension(dim);
if (dimValue == null || dimValue.size() == 0) {
continue;
}
if (dimValue.size() == 1) {
mw.put(new Text(dim), new Text(dimValue.get(0)));
} else {
Text[] dimValueArr = new Text[dimValue.size()];
for (int i = 0; i < dimValueArr.length; i++) {
dimValueArr[i] = new Text(dimValue.get(i));
}
mw.put(new Text(dim), new StringArrayWritable(dimValueArr));
}
}
}
//writing all metrics
Supplier<InputRow> supplier = new Supplier<InputRow>()
{
@Override
public InputRow get()
{
return row;
}
};
for (AggregatorFactory aggFactory : aggs) {
String k = aggFactory.getName();
Aggregator agg = aggFactory.factorize(
IncrementalIndex.makeColumnSelectorFactory(
aggFactory,
supplier,
true
)
);
agg.aggregate();
String t = aggFactory.getTypeName();
if (t.equals("float")) {
mw.put(new Text(k), new FloatWritable(agg.getFloat()));
} else if (t.equals("long")) {
mw.put(new Text(k), new LongWritable(agg.getLong()));
} else {
//its a complex metric
Object val = agg.get();
ComplexMetricSerde serde = getComplexMetricSerde(t);
mw.put(new Text(k), new BytesWritable(serde.toBytes(val)));
}
}
mw.write(out);
return out.toByteArray();
} catch(IOException ex) {
throw Throwables.propagate(ex);
}
}
public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs)
{
try {
DataInput in = ByteStreams.newDataInput(data);
//Read timestamp
long timestamp = in.readLong();
//Read dimensions
StringArrayWritable sw = new StringArrayWritable();
sw.readFields(in);
List<String> dimensions = Arrays.asList(sw.toStrings());
MapWritable mw = new MapWritable();
mw.readFields(in);
Map<String, Object> event = Maps.newHashMap();
for (String d : dimensions) {
Writable v = mw.get(new Text(d));
if (v == null) {
continue;
}
if (v instanceof Text) {
event.put(d, ((Text) v).toString());
} else if (v instanceof StringArrayWritable) {
event.put(d, Arrays.asList(((StringArrayWritable) v).toStrings()));
} else {
throw new ISE("unknown dim value type %s", v.getClass().getName());
}
}
//Read metrics
for (AggregatorFactory aggFactory : aggs) {
String k = aggFactory.getName();
Writable v = mw.get(new Text(k));
if (v == null) {
continue;
}
String t = aggFactory.getTypeName();
if (t.equals("float")) {
event.put(k, ((FloatWritable) v).get());
} else if (t.equals("long")) {
event.put(k, ((LongWritable) v).get());
} else {
//its a complex metric
ComplexMetricSerde serde = getComplexMetricSerde(t);
BytesWritable bw = (BytesWritable) v;
event.put(k, serde.fromBytes(bw.getBytes(), 0, bw.getLength()));
}
}
return new MapBasedInputRow(timestamp, dimensions, event);
} catch(IOException ex) {
throw Throwables.propagate(ex);
}
}
private static ComplexMetricSerde getComplexMetricSerde(String type)
{
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(type);
if (serde == null) {
throw new IAE("Unknown type[%s]", type);
}
return serde;
}
}
class StringArrayWritable extends ArrayWritable
{
public StringArrayWritable()
{
super(Text.class);
}
public StringArrayWritable(Text[] strs)
{
super(Text.class, strs);
}
}

View File

@ -153,7 +153,8 @@ public class DetermineHashedPartitionsJobTest
false,
false,
null,
null
null,
false
)
);
this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec);

View File

@ -262,7 +262,8 @@ public class DeterminePartitionsJobTest
false,
false,
null,
null
null,
false
)
)
);

View File

@ -197,7 +197,8 @@ public class HadoopDruidIndexerConfigTest
false,
false,
null,
null
null,
false
)
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexSpec;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
*/
public class HadoopTuningConfigTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerde() throws Exception
{
HadoopTuningConfig expected = new HadoopTuningConfig(
"/tmp/workingpath",
"version",
null,
null,
null,
100,
true,
true,
true,
true,
null,
true,
true,
true,
200,
0.1f,
true
);
HadoopTuningConfig actual = jsonReadWriteRead(jsonMapper.writeValueAsString(expected), HadoopTuningConfig.class);
Assert.assertEquals("/tmp/workingpath", actual.getWorkingPath());
Assert.assertEquals("version", actual.getVersion());
Assert.assertNotNull(actual.getPartitionsSpec());
Assert.assertEquals(ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(), actual.getShardSpecs());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpec());
Assert.assertEquals(100, actual.getRowFlushBoundary());
Assert.assertEquals(true, actual.isLeaveIntermediate());
Assert.assertEquals(true, actual.isCleanupOnFailure());
Assert.assertEquals(true, actual.isOverwriteFiles());
Assert.assertEquals(true, actual.isIgnoreInvalidRows());
Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties());
Assert.assertEquals(true, actual.isCombineText());
Assert.assertEquals(true, actual.isPersistInHeap());
Assert.assertEquals(true, actual.isIngestOffheap());
Assert.assertEquals(200, actual.getBufferSize());
Assert.assertEquals(0.1f, actual.getAggregationBufferRatio(), 0.0001);
Assert.assertEquals(true, actual.getUseCombiner());
}
public static <T> T jsonReadWriteRead(String s, Class<T> klass)
{
try {
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
*/
public class IndexGeneratorCombinerTest
{
private AggregatorFactory[] aggregators;
private IndexGeneratorJob.IndexGeneratorCombiner combiner;
@Before
public void setUp() throws Exception
{
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011"))
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
Configuration hadoopConfig = new Configuration();
hadoopConfig.set(
HadoopDruidIndexerConfig.CONFIG_PROPERTY,
HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config)
);
Reducer.Context context = EasyMock.createMock(Reducer.Context.class);
EasyMock.expect(context.getConfiguration()).andReturn(hadoopConfig);
EasyMock.replay(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
combiner = new IndexGeneratorJob.IndexGeneratorCombiner();
combiner.setup(context);
}
@Test
public void testSingleRowNoMergePassThrough() throws Exception
{
Reducer.Context context = EasyMock.createMock(Reducer.Context.class);
Capture<BytesWritable> captureKey = Capture.newInstance();
Capture<BytesWritable> captureVal = Capture.newInstance();
context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal));
EasyMock.replay(context);
BytesWritable key = new BytesWritable("dummy_key".getBytes());
BytesWritable val = new BytesWritable("dummy_row".getBytes());
combiner.reduce(key, Lists.newArrayList(val), context);
Assert.assertTrue(captureKey.getValue() == key);
Assert.assertTrue(captureVal.getValue() == val);
}
@Test
public void testMultipleRowsMerged() throws Exception
{
long timestamp = System.currentTimeMillis();
Bucket bucket = new Bucket(0, new DateTime(timestamp), 0);
SortableBytes keySortableBytes = new SortableBytes(
bucket.toGroupKey(),
new byte[0]
);
BytesWritable key = keySortableBytes.toBytesWritable();
InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableMap.<String, Object>of(
"host", "host1",
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableMap.<String, Object>of(
"host", "host2",
"visited", 5
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(row1, aggregators)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators))
);
Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Capture<BytesWritable> captureKey = Capture.newInstance();
Capture<BytesWritable> captureVal = Capture.newInstance();
context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal));
EasyMock.replay(context);
combiner.reduce(
key,
rows,
context
);
Assert.assertTrue(captureKey.getValue() == key);
InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators);
Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum"));
Assert.assertEquals(2.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001);
}
}

View File

@ -35,6 +35,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
@ -70,6 +71,7 @@ public class IndexGeneratorJobTest
return Arrays.asList(
new Object[][]{
{
false,
"single",
"2014-10-22T00:00:00Z/P2D",
new String[][][]{
@ -112,6 +114,7 @@ public class IndexGeneratorJobTest
)
},
{
false,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
@ -139,7 +142,41 @@ public class IndexGeneratorJobTest
"2014102213,n.example.com,234",
"2014102214,o.example.com,325",
"2014102215,p.example.com,3533",
"2014102216,q.example.com,587"
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
)
},
{
true,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
{
{ 0, 4 },
{ 1, 4 },
{ 2, 4 },
{ 3, 4 }
}
},
ImmutableList.of(
"2014102200,a.example.com,100",
"2014102201,b.exmaple.com,50",
"2014102202,c.example.com,200",
"2014102203,d.example.com,250",
"2014102204,e.example.com,123",
"2014102205,f.example.com,567",
"2014102206,g.example.com,11",
"2014102207,h.example.com,251",
"2014102208,i.example.com,963",
"2014102209,j.example.com,333",
"2014102210,k.example.com,253",
"2014102211,l.example.com,321",
"2014102212,m.example.com,3125",
"2014102213,n.example.com,234",
"2014102214,o.example.com,325",
"2014102215,p.example.com,3533",
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
)
}
}
@ -156,14 +193,17 @@ public class IndexGeneratorJobTest
private String partitionType;
private Object[][][] shardInfoForEachSegment;
private List<String> data;
private boolean useCombiner;
public IndexGeneratorJobTest(
boolean useCombiner,
String partitionType,
String interval,
Object[][][] shardInfoForEachSegment,
List<String> data
) throws IOException
{
this.useCombiner = useCombiner;
this.partitionType = partitionType;
this.shardInfoForEachSegment = shardInfoForEachSegment;
this.interval = new Interval(interval);
@ -196,7 +236,10 @@ public class IndexGeneratorJobTest
ImmutableList.of("timestamp", "host", "visited_num")
)
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
@ -227,7 +270,8 @@ public class IndexGeneratorJobTest
false,
false,
null,
null
null,
useCombiner
)
)
);
@ -325,6 +369,7 @@ public class IndexGeneratorJobTest
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
if (partitionType.equals("hashed")) {
Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum];

View File

@ -0,0 +1,92 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.AggregatorsModule;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
*/
public class InputRowSerdeTest
{
private long timestamp;
private List<String> dims;
private Map<String, Object> event;
public InputRowSerdeTest()
{
this.timestamp = System.currentTimeMillis();
this.dims = ImmutableList.of("dim_non_existing", "d1", "d2");
this.event = ImmutableMap.<String, Object>of(
"d1", "d1v",
"d2", ImmutableList.of("d2v1", "d2v2"),
"m1", 5.0f,
"m2", 100L,
"m3", "m3v"
);
}
@Test
public void testSerde()
{
new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique
InputRow in = new MapBasedInputRow(
timestamp,
dims,
event
);
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"),
new DoubleSumAggregatorFactory("m1out", "m1"),
new LongSumAggregatorFactory("m2out", "m2"),
new HyperUniquesAggregatorFactory("m3out", "m3")
};
byte[] data = InputRowSerde.toBytes(in, aggregatorFactories);
InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories);
Assert.assertEquals(timestamp, out.getTimestampFromEpoch());
Assert.assertEquals(dims, out.getDimensions());
Assert.assertEquals(Collections.EMPTY_LIST, out.getDimension("dim_non_existing"));
Assert.assertEquals(ImmutableList.of("d1v"), out.getDimension("d1"));
Assert.assertEquals(ImmutableList.of("d2v1", "d2v2"), out.getDimension("d2"));
Assert.assertEquals(0.0f, out.getFloatMetric("agg_non_existing"), 0.00001);
Assert.assertEquals(5.0f, out.getFloatMetric("m1out"), 0.00001);
Assert.assertEquals(100L, out.getLongMetric("m2out"));
Assert.assertEquals(1, ((HyperLogLogCollector)out.getRaw("m3out")).estimateCardinality(), 0.001);
}
}

View File

@ -111,7 +111,8 @@ public class JobHelperTest
false,
false,
null,
null
null,
false
)
)
);

View File

@ -268,7 +268,8 @@ public class HadoopConverterJobTest
false,
false,
null,
null
null,
false
)
)
);

View File

@ -19,6 +19,7 @@ package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
@ -71,9 +72,9 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
{
private volatile DateTime maxIngestedEventTime;
protected static ColumnSelectorFactory makeColumnSelectorFactory(
public static ColumnSelectorFactory makeColumnSelectorFactory(
final AggregatorFactory agg,
final ThreadLocal<InputRow> in,
final Supplier<InputRow> in,
final boolean deserializeComplexMetrics
)
{
@ -260,6 +261,14 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
private Supplier<InputRow> rowSupplier = new Supplier<InputRow>()
{
@Override
public InputRow get()
{
return in.get();
}
};
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
@ -283,7 +292,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = initAggs(metrics, in, deserializeComplexMetrics);
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
for (int i = 0; i < metrics.length; i++) {
final String metricName = metrics[i].getName();
@ -343,7 +352,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
protected abstract AggregatorType[] initAggs(
AggregatorFactory[] metrics,
ThreadLocal<InputRow> in,
Supplier<InputRow> rowSupplier,
boolean deserializeComplexMetrics
);
@ -353,7 +362,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException;
protected abstract AggregatorType[] getAggsForRow(int rowOffset);
@ -449,7 +459,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
final Integer rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in);
final Integer rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier);
updateMaxIngestedTime(row.getTimestamp());
return rv;
}

View File

@ -18,6 +18,7 @@
package io.druid.segment.incremental;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
@ -156,7 +157,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
@Override
protected BufferAggregator[] initAggs(
AggregatorFactory[] metrics,
ThreadLocal<InputRow> in,
Supplier<InputRow> rowSupplier,
boolean deserializeComplexMetrics
)
{
@ -164,7 +165,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
}
return aggs;
@ -177,7 +178,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
final BufferAggregator[] aggs = getAggs();
@ -199,13 +201,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
}
in.set(row);
rowContainer.set(row);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
rowContainer.set(null);
return numEntries.get();
}

View File

@ -17,6 +17,7 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
@ -112,7 +113,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
protected Aggregator[] initAggs(
AggregatorFactory[] metrics, ThreadLocal<InputRow> in, boolean deserializeComplexMetrics
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
)
{
return new Aggregator[metrics.length];
@ -125,7 +126,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
final Integer priorIndex = facts.get(key);
@ -136,10 +138,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
aggs = concurrentGet(priorIndex);
} else {
aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
}
final Integer rowIndex = indexIncrement.getAndIncrement();
@ -162,7 +165,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
in.set(row);
rowContainer.set(row);
for (Aggregator agg : aggs) {
synchronized (agg) {
@ -170,7 +173,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
in.set(null);
rowContainer.set(null);
return numEntries.get();

View File

@ -69,4 +69,32 @@ public abstract class ComplexMetricSerde
{
return null;
}
/**
* Converts intermediate representation of aggregate to byte[].
*
* @param val intermediate representation of aggregate
* @return serialized intermediate representation of aggregate in byte[]
*/
public byte[] toBytes(Object val)
{
return getObjectStrategy().toBytes(val);
}
/**
* Converts byte[] to intermediate representation of the aggregate.
*
* @param byte array
* @param start offset in the byte array where to start reading
* @param numBytes number of bytes to read in given array
* @return intermediate representation of the aggregate
*/
public Object fromBytes(byte[] data, int start, int numBytes)
{
ByteBuffer bb = ByteBuffer.wrap(data);
if(start > 0) {
bb.position(start);
}
return getObjectStrategy().fromByteBuffer(bb, numBytes);
}
}

View File

@ -20,6 +20,7 @@ package io.druid.segment.incremental;
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.carrotsearch.junitbenchmarks.Clock;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -135,7 +136,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
@ -147,10 +149,11 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
aggs = indexedMap.get(priorIdex);
} else {
aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
}
Integer rowIndex;
@ -176,7 +179,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
}
}
in.set(row);
rowContainer.set(row);
for (Aggregator agg : aggs) {
synchronized (agg) {
@ -184,7 +187,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
}
}
in.set(null);
rowContainer.set(null);
return numEntries.get();