adding flag useCombiner to hadoop tuning config that can be used to add a

hadoop combiner to hadoop batch ingestion to do merges on the mappers if possible
This commit is contained in:
Himanshu Gupta 2015-07-15 13:17:16 -05:00
parent 4ef484048a
commit f836c3a7ac
9 changed files with 430 additions and 41 deletions

View File

@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
private static final boolean DEFAULT_USE_COMBINER = false;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
@ -61,7 +62,8 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO
DEFAULT_AGG_BUFFER_RATIO,
DEFAULT_USE_COMBINER
);
}
@ -81,6 +83,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
private final boolean useCombiner;
@JsonCreator
public HadoopTuningConfig(
@ -99,7 +102,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio,
final @JsonProperty("useCombiner") Boolean useCombiner
)
{
this.workingPath = workingPath;
@ -120,6 +124,7 @@ public class HadoopTuningConfig implements TuningConfig
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
}
@JsonProperty
@ -216,6 +221,12 @@ public class HadoopTuningConfig implements TuningConfig
return aggregationBufferRatio;
}
@JsonProperty
public boolean getUseCombiner()
{
return useCombiner;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -234,7 +245,8 @@ public class HadoopTuningConfig implements TuningConfig
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
@ -256,7 +268,8 @@ public class HadoopTuningConfig implements TuningConfig
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
@ -278,7 +291,8 @@ public class HadoopTuningConfig implements TuningConfig
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
}

View File

@ -31,9 +31,9 @@ import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
@ -64,11 +64,13 @@ import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -149,6 +151,11 @@ public class IndexGeneratorJob implements Jobby
throw new RuntimeException("No buckets?? seems there is no data to index.");
}
if(config.getSchema().getTuningConfig().getUseCombiner()) {
job.setCombinerClass(IndexGeneratorCombiner.class);
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
}
job.setNumReduceTasks(numReducers);
job.setPartitionerClass(IndexGeneratorPartitioner.class);
@ -193,6 +200,36 @@ public class IndexGeneratorJob implements Jobby
}
}
private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket,
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
boolean isOffHeap,
StupidPool bufferPool
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (isOffHeap) {
return new OffheapIncrementalIndex(
indexSchema,
bufferPool,
true,
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
{
private static final HashFunction hashFunction = Hashing.murmur3_128();
@ -245,6 +282,128 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorCombiner extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>
{
private HadoopDruidIndexerConfig config;
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
}
@Override
protected void reduce(
final BytesWritable key, Iterable<BytesWritable> values, final Context context
) throws IOException, InterruptedException
{
Iterator<BytesWritable> iter = values.iterator();
BytesWritable first = iter.next();
if(iter.hasNext()) {
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));
while(iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);
if(!index.canAppendRow()) {
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, false, null);
}
index.add(value);
}
flushIndexToContextAndClose(key, index, context);
} else {
context.write(key, first);
}
}
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException
{
Iterator<Row> rows = index.iterator();
while(rows.hasNext()) {
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, index.getDimensions());
context.write(
key,
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))
);
}
index.close();
}
private InputRow getInputRowFromRow(final Row row, final List<String> dimensions) {
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return dimensions;
}
@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}
@Override
public DateTime getTimestamp()
{
return row.getTimestamp();
}
@Override
public List<String> getDimension(String dimension)
{
return row.getDimension(dimension);
}
@Override
public Object getRaw(String dimension)
{
return row.getRaw(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return row.getFloatMetric(metric);
}
@Override
public long getLongMetric(String metric)
{
return row.getLongMetric(metric);
}
@Override
public int compareTo(Row o)
{
return row.compareTo(o);
}
};
}
}
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Writable> implements Configurable
{
private Configuration config;
@ -350,7 +509,13 @@ public class IndexGeneratorJob implements Jobby
* config.getSchema().getTuningConfig().getAggregationBufferRatio());
final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, bufferPool);
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
@ -392,7 +557,13 @@ public class IndexGeneratorJob implements Jobby
// close this index and make a new one, reusing same buffer
index.close();
index = makeIncrementalIndex(bucket, combiningAggs, bufferPool);
index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
);
startTime = System.currentTimeMillis();
++indexCount;
}
@ -475,31 +646,6 @@ public class IndexGeneratorJob implements Jobby
index.close();
}
}
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
bufferPool,
true,
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}
}
}
public static class IndexGeneratorOutputFormat extends TextOutputFormat

View File

@ -153,7 +153,8 @@ public class DetermineHashedPartitionsJobTest
false,
false,
null,
null
null,
false
)
);
this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec);

View File

@ -262,7 +262,8 @@ public class DeterminePartitionsJobTest
false,
false,
null,
null
null,
false
)
)
);

View File

@ -197,7 +197,8 @@ public class HadoopDruidIndexerConfigTest
false,
false,
null,
null
null,
false
)
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);

View File

@ -0,0 +1,179 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
*/
public class IndexGeneratorCombinerTest
{
private AggregatorFactory[] aggregators;
private IndexGeneratorJob.IndexGeneratorCombiner combiner;
@Before
public void setUp() throws Exception
{
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011"))
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
Configuration hadoopConfig = new Configuration();
hadoopConfig.set(
HadoopDruidIndexerConfig.CONFIG_PROPERTY,
HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config)
);
Reducer.Context context = EasyMock.createMock(Reducer.Context.class);
EasyMock.expect(context.getConfiguration()).andReturn(hadoopConfig);
EasyMock.replay(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
combiner = new IndexGeneratorJob.IndexGeneratorCombiner();
combiner.setup(context);
}
@Test
public void testSingleRowNoMergePassThrough() throws Exception
{
Reducer.Context context = EasyMock.createMock(Reducer.Context.class);
Capture<BytesWritable> captureKey = Capture.newInstance();
Capture<BytesWritable> captureVal = Capture.newInstance();
context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal));
EasyMock.replay(context);
BytesWritable key = new BytesWritable("dummy_key".getBytes());
BytesWritable val = new BytesWritable("dummy_row".getBytes());
combiner.reduce(key, Lists.newArrayList(val), context);
Assert.assertTrue(captureKey.getValue() == key);
Assert.assertTrue(captureVal.getValue() == val);
}
@Test
public void testMultipleRowsMerged() throws Exception
{
long timestamp = System.currentTimeMillis();
Bucket bucket = new Bucket(0, new DateTime(timestamp), 0);
SortableBytes keySortableBytes = new SortableBytes(
bucket.toGroupKey(),
new byte[0]
);
BytesWritable key = keySortableBytes.toBytesWritable();
InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableMap.<String, Object>of(
"host", "host1",
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableMap.<String, Object>of(
"host", "host2",
"visited", 5
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(row1, aggregators)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators))
);
Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Capture<BytesWritable> captureKey = Capture.newInstance();
Capture<BytesWritable> captureVal = Capture.newInstance();
context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal));
EasyMock.replay(context);
combiner.reduce(
key,
rows,
context
);
Assert.assertTrue(captureKey.getValue() == key);
InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators);
Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum"));
Assert.assertEquals(2.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001);
}
}

View File

@ -35,6 +35,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
@ -70,6 +71,7 @@ public class IndexGeneratorJobTest
return Arrays.asList(
new Object[][]{
{
false,
"single",
"2014-10-22T00:00:00Z/P2D",
new String[][][]{
@ -112,6 +114,7 @@ public class IndexGeneratorJobTest
)
},
{
false,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
@ -139,7 +142,41 @@ public class IndexGeneratorJobTest
"2014102213,n.example.com,234",
"2014102214,o.example.com,325",
"2014102215,p.example.com,3533",
"2014102216,q.example.com,587"
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
)
},
{
true,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
{
{ 0, 4 },
{ 1, 4 },
{ 2, 4 },
{ 3, 4 }
}
},
ImmutableList.of(
"2014102200,a.example.com,100",
"2014102201,b.exmaple.com,50",
"2014102202,c.example.com,200",
"2014102203,d.example.com,250",
"2014102204,e.example.com,123",
"2014102205,f.example.com,567",
"2014102206,g.example.com,11",
"2014102207,h.example.com,251",
"2014102208,i.example.com,963",
"2014102209,j.example.com,333",
"2014102210,k.example.com,253",
"2014102211,l.example.com,321",
"2014102212,m.example.com,3125",
"2014102213,n.example.com,234",
"2014102214,o.example.com,325",
"2014102215,p.example.com,3533",
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
)
}
}
@ -156,14 +193,17 @@ public class IndexGeneratorJobTest
private String partitionType;
private Object[][][] shardInfoForEachSegment;
private List<String> data;
private boolean useCombiner;
public IndexGeneratorJobTest(
boolean useCombiner,
String partitionType,
String interval,
Object[][][] shardInfoForEachSegment,
List<String> data
) throws IOException
{
this.useCombiner = useCombiner;
this.partitionType = partitionType;
this.shardInfoForEachSegment = shardInfoForEachSegment;
this.interval = new Interval(interval);
@ -196,7 +236,10 @@ public class IndexGeneratorJobTest
ImmutableList.of("timestamp", "host", "visited_num")
)
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
@ -227,7 +270,8 @@ public class IndexGeneratorJobTest
false,
false,
null,
null
null,
useCombiner
)
)
);
@ -325,6 +369,7 @@ public class IndexGeneratorJobTest
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
if (partitionType.equals("hashed")) {
Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum];

View File

@ -111,7 +111,8 @@ public class JobHelperTest
false,
false,
null,
null
null,
false
)
)
);

View File

@ -268,7 +268,8 @@ public class HadoopConverterJobTest
false,
false,
null,
null
null,
false
)
)
);