From 0439e8ec23d3f0fa2f276b60c2009cb5900e880f Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 15 Jul 2015 12:05:49 -0500 Subject: [PATCH 1/5] adding serde methods for intermediate aggregation object to ComplexMetricSerde This provides the alternative to using ComplexMetricSerde.getObjectStrategy() and using the serde methods from ObjectStrategy as that usage pattern is deprecated. --- .../segment/serde/ComplexMetricSerde.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java b/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java index cdcf2e218bd..9eb9bb46cae 100644 --- a/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java @@ -69,4 +69,32 @@ public abstract class ComplexMetricSerde { return null; } + + /** + * Converts intermediate representation of aggregate to byte[]. + * + * @param val intermediate representation of aggregate + * @return serialized intermediate representation of aggregate in byte[] + */ + public byte[] toBytes(Object val) + { + return getObjectStrategy().toBytes(val); + } + + /** + * Converts byte[] to intermediate representation of the aggregate. + * + * @param byte array + * @param start offset in the byte array where to start reading + * @param numBytes number of bytes to read in given array + * @return intermediate representation of the aggregate + */ + public Object fromBytes(byte[] data, int start, int numBytes) + { + ByteBuffer bb = ByteBuffer.wrap(data); + if(start > 0) { + bb.position(start); + } + return getObjectStrategy().fromByteBuffer(bb, numBytes); + } } From f7a92db332485172ef5f0239d26b7c480b1f2b9d Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 15 Jul 2015 12:09:25 -0500 Subject: [PATCH 2/5] generic byte[] serde for InputRow --- .../java/io/druid/indexer/InputRowSerde.java | 228 ++++++++++++++++++ .../io/druid/indexer/InputRowSerdeTest.java | 92 +++++++ .../segment/incremental/IncrementalIndex.java | 22 +- .../incremental/OffheapIncrementalIndex.java | 12 +- .../incremental/OnheapIncrementalIndex.java | 13 +- .../OnheapIncrementalIndexBenchmark.java | 11 +- 6 files changed, 358 insertions(+), 20 deletions(-) create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java new file mode 100644 index 00000000000..c7707e94db2 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -0,0 +1,228 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.serde.ComplexMetricSerde; +import io.druid.segment.serde.ComplexMetrics; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + */ +public class InputRowSerde +{ + private static final Logger log = new Logger(InputRowSerde.class); + + private static final Text[] EMPTY_TEXT_ARRAY = new Text[0]; + + public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs) + { + try { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + + //write timestamp + out.writeLong(row.getTimestampFromEpoch()); + + //writing all dimensions + List dimList = row.getDimensions(); + + Text[] dims = EMPTY_TEXT_ARRAY; + if(dimList != null) { + dims = new Text[dimList.size()]; + for (int i = 0; i < dims.length; i++) { + dims[i] = new Text(dimList.get(i)); + } + } + StringArrayWritable sw = new StringArrayWritable(dims); + sw.write(out); + + MapWritable mw = new MapWritable(); + + if(dimList != null) { + for (String dim : dimList) { + List dimValue = row.getDimension(dim); + + if (dimValue == null || dimValue.size() == 0) { + continue; + } + + if (dimValue.size() == 1) { + mw.put(new Text(dim), new Text(dimValue.get(0))); + } else { + Text[] dimValueArr = new Text[dimValue.size()]; + for (int i = 0; i < dimValueArr.length; i++) { + dimValueArr[i] = new Text(dimValue.get(i)); + } + mw.put(new Text(dim), new StringArrayWritable(dimValueArr)); + } + } + } + + //writing all metrics + Supplier supplier = new Supplier() + { + @Override + public InputRow get() + { + return row; + } + }; + for (AggregatorFactory aggFactory : aggs) { + String k = aggFactory.getName(); + + Aggregator agg = aggFactory.factorize( + IncrementalIndex.makeColumnSelectorFactory( + aggFactory, + supplier, + true + ) + ); + agg.aggregate(); + + String t = aggFactory.getTypeName(); + + if (t.equals("float")) { + mw.put(new Text(k), new FloatWritable(agg.getFloat())); + } else if (t.equals("long")) { + mw.put(new Text(k), new LongWritable(agg.getLong())); + } else { + //its a complex metric + Object val = agg.get(); + ComplexMetricSerde serde = getComplexMetricSerde(t); + mw.put(new Text(k), new BytesWritable(serde.toBytes(val))); + } + } + + mw.write(out); + return out.toByteArray(); + } catch(IOException ex) { + throw Throwables.propagate(ex); + } + } + + public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs) + { + try { + DataInput in = ByteStreams.newDataInput(data); + + //Read timestamp + long timestamp = in.readLong(); + + //Read dimensions + StringArrayWritable sw = new StringArrayWritable(); + sw.readFields(in); + List dimensions = Arrays.asList(sw.toStrings()); + + MapWritable mw = new MapWritable(); + mw.readFields(in); + + Map event = Maps.newHashMap(); + + for (String d : dimensions) { + Writable v = mw.get(new Text(d)); + + if (v == null) { + continue; + } + + if (v instanceof Text) { + event.put(d, ((Text) v).toString()); + } else if (v instanceof StringArrayWritable) { + event.put(d, Arrays.asList(((StringArrayWritable) v).toStrings())); + } else { + throw new ISE("unknown dim value type %s", v.getClass().getName()); + } + } + + //Read metrics + for (AggregatorFactory aggFactory : aggs) { + String k = aggFactory.getName(); + Writable v = mw.get(new Text(k)); + + if (v == null) { + continue; + } + + String t = aggFactory.getTypeName(); + + if (t.equals("float")) { + event.put(k, ((FloatWritable) v).get()); + } else if (t.equals("long")) { + event.put(k, ((LongWritable) v).get()); + } else { + //its a complex metric + ComplexMetricSerde serde = getComplexMetricSerde(t); + BytesWritable bw = (BytesWritable) v; + event.put(k, serde.fromBytes(bw.getBytes(), 0, bw.getLength())); + } + } + + return new MapBasedInputRow(timestamp, dimensions, event); + } catch(IOException ex) { + throw Throwables.propagate(ex); + } + } + + private static ComplexMetricSerde getComplexMetricSerde(String type) + { + ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(type); + if (serde == null) { + throw new IAE("Unknown type[%s]", type); + } + return serde; + } +} + +class StringArrayWritable extends ArrayWritable +{ + public StringArrayWritable() + { + super(Text.class); + } + + public StringArrayWritable(Text[] strs) + { + super(Text.class, strs); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java new file mode 100644 index 00000000000..8c10f7aa091 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java @@ -0,0 +1,92 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.jackson.AggregatorsModule; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + */ +public class InputRowSerdeTest +{ + private long timestamp; + private List dims; + private Map event; + + public InputRowSerdeTest() + { + this.timestamp = System.currentTimeMillis(); + this.dims = ImmutableList.of("dim_non_existing", "d1", "d2"); + this.event = ImmutableMap.of( + "d1", "d1v", + "d2", ImmutableList.of("d2v1", "d2v2"), + "m1", 5.0f, + "m2", 100L, + "m3", "m3v" + ); + } + + @Test + public void testSerde() + { + new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique + + InputRow in = new MapBasedInputRow( + timestamp, + dims, + event + ); + + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] { + new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"), + new DoubleSumAggregatorFactory("m1out", "m1"), + new LongSumAggregatorFactory("m2out", "m2"), + new HyperUniquesAggregatorFactory("m3out", "m3") + }; + + byte[] data = InputRowSerde.toBytes(in, aggregatorFactories); + InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories); + + Assert.assertEquals(timestamp, out.getTimestampFromEpoch()); + Assert.assertEquals(dims, out.getDimensions()); + Assert.assertEquals(Collections.EMPTY_LIST, out.getDimension("dim_non_existing")); + Assert.assertEquals(ImmutableList.of("d1v"), out.getDimension("d1")); + Assert.assertEquals(ImmutableList.of("d2v1", "d2v2"), out.getDimension("d2")); + + Assert.assertEquals(0.0f, out.getFloatMetric("agg_non_existing"), 0.00001); + Assert.assertEquals(5.0f, out.getFloatMetric("m1out"), 0.00001); + Assert.assertEquals(100L, out.getLongMetric("m2out")); + Assert.assertEquals(1, ((HyperLogLogCollector)out.getRaw("m3out")).estimateCardinality(), 0.001); + } +} diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 92bcb460b38..5427d1173a3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -19,6 +19,7 @@ package io.druid.segment.incremental; import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; @@ -71,9 +72,9 @@ public abstract class IncrementalIndex implements Iterable, { private volatile DateTime maxIngestedEventTime; - protected static ColumnSelectorFactory makeColumnSelectorFactory( + public static ColumnSelectorFactory makeColumnSelectorFactory( final AggregatorFactory agg, - final ThreadLocal in, + final Supplier in, final boolean deserializeComplexMetrics ) { @@ -260,6 +261,14 @@ public abstract class IncrementalIndex implements Iterable, // This is modified on add() in a critical section. private ThreadLocal in = new ThreadLocal<>(); + private Supplier rowSupplier = new Supplier() + { + @Override + public InputRow get() + { + return in.get(); + } + }; /** * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that @@ -283,7 +292,7 @@ public abstract class IncrementalIndex implements Iterable, final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); - this.aggs = initAggs(metrics, in, deserializeComplexMetrics); + this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); for (int i = 0; i < metrics.length; i++) { final String metricName = metrics[i].getName(); @@ -343,7 +352,7 @@ public abstract class IncrementalIndex implements Iterable, protected abstract AggregatorType[] initAggs( AggregatorFactory[] metrics, - ThreadLocal in, + Supplier rowSupplier, boolean deserializeComplexMetrics ); @@ -353,7 +362,8 @@ public abstract class IncrementalIndex implements Iterable, InputRow row, AtomicInteger numEntries, TimeAndDims key, - ThreadLocal in + ThreadLocal rowContainer, + Supplier rowSupplier ) throws IndexSizeExceededException; protected abstract AggregatorType[] getAggsForRow(int rowOffset); @@ -449,7 +459,7 @@ public abstract class IncrementalIndex implements Iterable, } final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - final Integer rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in); + final Integer rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier); updateMaxIngestedTime(row.getTimestamp()); return rv; } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 32cce611b9d..dffdd2de79f 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -18,6 +18,7 @@ package io.druid.segment.incremental; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.metamx.common.ISE; import io.druid.collections.ResourceHolder; @@ -156,7 +157,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex @Override protected BufferAggregator[] initAggs( AggregatorFactory[] metrics, - ThreadLocal in, + Supplier rowSupplier, boolean deserializeComplexMetrics ) { @@ -164,7 +165,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorizeBuffered( - makeColumnSelectorFactory(agg, in, deserializeComplexMetrics) + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) ); } return aggs; @@ -177,7 +178,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex InputRow row, AtomicInteger numEntries, TimeAndDims key, - ThreadLocal in + ThreadLocal rowContainer, + Supplier rowSupplier ) throws IndexSizeExceededException { final BufferAggregator[] aggs = getAggs(); @@ -199,13 +201,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex } } } - in.set(row); + rowContainer.set(row); for (int i = 0; i < aggs.length; i++) { synchronized (aggs[i]) { aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); } } - in.set(null); + rowContainer.set(null); return numEntries.get(); } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 16b66c8b3a1..cbc034c49e0 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -17,6 +17,7 @@ package io.druid.segment.incremental; +import com.google.common.base.Supplier; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; @@ -112,7 +113,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override protected Aggregator[] initAggs( - AggregatorFactory[] metrics, ThreadLocal in, boolean deserializeComplexMetrics + AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics ) { return new Aggregator[metrics.length]; @@ -125,7 +126,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex InputRow row, AtomicInteger numEntries, TimeAndDims key, - ThreadLocal in + ThreadLocal rowContainer, + Supplier rowSupplier ) throws IndexSizeExceededException { final Integer priorIndex = facts.get(key); @@ -136,10 +138,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex aggs = concurrentGet(priorIndex); } else { aggs = new Aggregator[metrics.length]; + for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - makeColumnSelectorFactory(agg, in, deserializeComplexMetrics) + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) ); } final Integer rowIndex = indexIncrement.getAndIncrement(); @@ -162,7 +165,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - in.set(row); + rowContainer.set(row); for (Aggregator agg : aggs) { synchronized (agg) { @@ -170,7 +173,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - in.set(null); + rowContainer.set(null); return numEntries.get(); diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 21c38701b43..1a67ec12e5b 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -20,6 +20,7 @@ package io.druid.segment.incremental; import com.carrotsearch.junitbenchmarks.AbstractBenchmark; import com.carrotsearch.junitbenchmarks.BenchmarkOptions; import com.carrotsearch.junitbenchmarks.Clock; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -135,7 +136,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark InputRow row, AtomicInteger numEntries, TimeAndDims key, - ThreadLocal in + ThreadLocal rowContainer, + Supplier rowSupplier ) throws IndexSizeExceededException { @@ -147,10 +149,11 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark aggs = indexedMap.get(priorIdex); } else { aggs = new Aggregator[metrics.length]; + for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - makeColumnSelectorFactory(agg, in, deserializeComplexMetrics) + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) ); } Integer rowIndex; @@ -176,7 +179,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark } } - in.set(row); + rowContainer.set(row); for (Aggregator agg : aggs) { synchronized (agg) { @@ -184,7 +187,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark } } - in.set(null); + rowContainer.set(null); return numEntries.get(); From 4ef484048a4baa050f917ac939d8b6dda18832cf Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 15 Jul 2015 13:00:57 -0500 Subject: [PATCH 3/5] take control of InputRow serde between Mapper/Reducer in Hadoop Indexing This allows for arbitrary InputFormat while hadoop batch ingestion that can return records of value type other than Text --- .../indexer/DetermineHashedPartitionsJob.java | 2 +- .../druid/indexer/DeterminePartitionsJob.java | 8 +-- .../indexer/HadoopDruidIndexerMapper.java | 10 +-- .../io/druid/indexer/IndexGeneratorJob.java | 64 +++++++++---------- 4 files changed, 42 insertions(+), 42 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index f4e96b5cc05..e217b82ee3e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -235,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 62dd90470ee..a21e5f7437b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -259,7 +259,7 @@ public class DeterminePartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { @@ -340,7 +340,7 @@ public class DeterminePartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { @@ -378,7 +378,7 @@ public class DeterminePartitionsJob implements Jobby } public void emitDimValueCounts( - TaskInputOutputContext context, + TaskInputOutputContext context, DateTime timestamp, Map> dims ) throws IOException, InterruptedException @@ -891,7 +891,7 @@ public class DeterminePartitionsJob implements Jobby } private static void write( - TaskInputOutputContext context, + TaskInputOutputContext context, final byte[] groupKey, DimValueCount dimValueCount ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index bd561106f70..e90a0c59018 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -41,11 +41,11 @@ import org.joda.time.DateTime; import com.metamx.common.RE; -public abstract class HadoopDruidIndexerMapper extends Mapper +public abstract class HadoopDruidIndexerMapper extends Mapper { private static final Logger log = new Logger(HadoopDruidIndexerMapper.class); - private HadoopDruidIndexerConfig config; + protected HadoopDruidIndexerConfig config; private InputRowParser parser; protected GranularitySpec granularitySpec; @@ -70,7 +70,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< @Override protected void map( - Writable key, Writable value, Context context + Object key, Object value, Context context ) throws IOException, InterruptedException { try { @@ -99,7 +99,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - public final static InputRow parseInputRow(Writable value, InputRowParser parser) + public final static InputRow parseInputRow(Object value, InputRowParser parser) { if(parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before @@ -109,7 +109,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - abstract protected void innerMap(InputRow inputRow, Writable value, Context context) + abstract protected void innerMap(InputRow inputRow, Object value, Context context) throws IOException, InterruptedException; } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index f69af4aa27e..a8b7ed14567 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -35,7 +35,6 @@ import com.metamx.common.parsers.ParseException; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; -import io.druid.data.input.impl.InputRowParser; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; @@ -141,7 +140,7 @@ public class IndexGeneratorJob implements Jobby JobHelper.setInputFormat(job, config); job.setMapperClass(IndexGeneratorMapper.class); - job.setMapOutputValueClass(Text.class); + job.setMapOutputValueClass(BytesWritable.class); SortableBytes.useSortableBytesAsMapOutputKey(job); @@ -149,6 +148,7 @@ public class IndexGeneratorJob implements Jobby if (numReducers == 0) { throw new RuntimeException("No buckets?? seems there is no data to index."); } + job.setNumReduceTasks(numReducers); job.setPartitionerClass(IndexGeneratorPartitioner.class); @@ -193,14 +193,24 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); + private AggregatorFactory[] aggregators; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + super.setup(context); + aggregators = config.getSchema().getDataSchema().getAggregators(); + } + @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { @@ -230,7 +240,7 @@ public class IndexGeneratorJob implements Jobby .put(hashedDimensions) .array() ).toBytesWritable(), - value + new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators)) ); } } @@ -269,11 +279,12 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorReducer extends Reducer + public static class IndexGeneratorReducer extends Reducer { protected HadoopDruidIndexerConfig config; private List metricNames = Lists.newArrayList(); - private InputRowParser parser; + private AggregatorFactory[] aggregators; + private AggregatorFactory[] combiningAggs; protected ProgressIndicator makeProgressIndicator(final Context context) { @@ -317,29 +328,29 @@ public class IndexGeneratorJob implements Jobby { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) { - metricNames.add(factory.getName()); + aggregators = config.getSchema().getDataSchema().getAggregators(); + combiningAggs = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; ++i) { + metricNames.add(aggregators[i].getName()); + combiningAggs[i] = aggregators[i].getCombiningFactory(); } - - parser = config.getParser(); } @Override protected void reduce( - BytesWritable key, Iterable values, final Context context + BytesWritable key, Iterable values, final Context context ) throws IOException, InterruptedException { SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); - final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize(); final int aggregationBufferSize = (int) ((double) maxTotalBufferSize * config.getSchema().getTuningConfig().getAggregationBufferRatio()); final StupidPool bufferPool = new OffheapBufferPool(aggregationBufferSize); - IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool); + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); try { File baseFlushFile = File.createTempFile("base", "flush"); baseFlushFile.delete(); @@ -354,24 +365,13 @@ public class IndexGeneratorJob implements Jobby Set allDimensionNames = Sets.newHashSet(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); - for (final Writable value : values) { + for (final BytesWritable bw : values) { context.progress(); - int numRows; - try { - final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); - allDimensionNames.addAll(inputRow.getDimensions()); - numRows = index.add(inputRow); - } - catch (ParseException e) { - if (config.isIgnoreInvalidRows()) { - log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - continue; - } else { - throw e; - } - } + final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators)); + allDimensionNames.addAll(inputRow.getDimensions()); + int numRows = index.add(inputRow); + ++lineCount; if (!index.canAppendRow()) { @@ -391,8 +391,8 @@ public class IndexGeneratorJob implements Jobby persist(index, interval, file, progressIndicator); // close this index and make a new one, reusing same buffer index.close(); - index = makeIncrementalIndex(bucket, aggs, bufferPool); + index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); startTime = System.currentTimeMillis(); ++indexCount; } @@ -421,7 +421,7 @@ public class IndexGeneratorJob implements Jobby indexes.add(IndexIO.loadIndex(file)); } mergedBase = mergeQueryableIndex( - indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator + indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator ); } final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()) From f836c3a7acccc3c286af52470dda676c58324156 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 15 Jul 2015 13:17:16 -0500 Subject: [PATCH 4/5] adding flag useCombiner to hadoop tuning config that can be used to add a hadoop combiner to hadoop batch ingestion to do merges on the mappers if possible --- .../io/druid/indexer/HadoopTuningConfig.java | 24 ++- .../io/druid/indexer/IndexGeneratorJob.java | 202 +++++++++++++++--- .../DetermineHashedPartitionsJobTest.java | 3 +- .../indexer/DeterminePartitionsJobTest.java | 3 +- .../indexer/HadoopDruidIndexerConfigTest.java | 3 +- .../indexer/IndexGeneratorCombinerTest.java | 179 ++++++++++++++++ .../druid/indexer/IndexGeneratorJobTest.java | 51 ++++- .../java/io/druid/indexer/JobHelperTest.java | 3 +- .../updater/HadoopConverterJobTest.java | 3 +- 9 files changed, 430 insertions(+), 41 deletions(-) create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 4a4e23ae7ba..69a244ea84b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024; private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f; + private static final boolean DEFAULT_USE_COMBINER = false; public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -61,7 +62,8 @@ public class HadoopTuningConfig implements TuningConfig false, false, DEFAULT_BUFFER_SIZE, - DEFAULT_AGG_BUFFER_RATIO + DEFAULT_AGG_BUFFER_RATIO, + DEFAULT_USE_COMBINER ); } @@ -81,6 +83,7 @@ public class HadoopTuningConfig implements TuningConfig private final boolean ingestOffheap; private final int bufferSize; private final float aggregationBufferRatio; + private final boolean useCombiner; @JsonCreator public HadoopTuningConfig( @@ -99,7 +102,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("persistInHeap") boolean persistInHeap, final @JsonProperty("ingestOffheap") boolean ingestOffheap, final @JsonProperty("bufferSize") Integer bufferSize, - final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio + final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio, + final @JsonProperty("useCombiner") Boolean useCombiner ) { this.workingPath = workingPath; @@ -120,6 +124,7 @@ public class HadoopTuningConfig implements TuningConfig this.ingestOffheap = ingestOffheap; this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize; this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio; + this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); } @JsonProperty @@ -216,6 +221,12 @@ public class HadoopTuningConfig implements TuningConfig return aggregationBufferRatio; } + @JsonProperty + public boolean getUseCombiner() + { + return useCombiner; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -234,7 +245,8 @@ public class HadoopTuningConfig implements TuningConfig persistInHeap, ingestOffheap, bufferSize, - aggregationBufferRatio + aggregationBufferRatio, + useCombiner ); } @@ -256,7 +268,8 @@ public class HadoopTuningConfig implements TuningConfig persistInHeap, ingestOffheap, bufferSize, - aggregationBufferRatio + aggregationBufferRatio, + useCombiner ); } @@ -278,7 +291,8 @@ public class HadoopTuningConfig implements TuningConfig persistInHeap, ingestOffheap, bufferSize, - aggregationBufferRatio + aggregationBufferRatio, + useCombiner ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index a8b7ed14567..56230251828 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -31,9 +31,9 @@ import com.google.common.primitives.Longs; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; -import com.metamx.common.parsers.ParseException; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; @@ -64,11 +64,13 @@ import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -149,6 +151,11 @@ public class IndexGeneratorJob implements Jobby throw new RuntimeException("No buckets?? seems there is no data to index."); } + if(config.getSchema().getTuningConfig().getUseCombiner()) { + job.setCombinerClass(IndexGeneratorCombiner.class); + job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class); + } + job.setNumReduceTasks(numReducers); job.setPartitionerClass(IndexGeneratorPartitioner.class); @@ -193,6 +200,36 @@ public class IndexGeneratorJob implements Jobby } } + private static IncrementalIndex makeIncrementalIndex( + Bucket theBucket, + AggregatorFactory[] aggs, + HadoopDruidIndexerConfig config, + boolean isOffHeap, + StupidPool bufferPool + ) + { + final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) + .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) + .withMetrics(aggs) + .build(); + if (isOffHeap) { + return new OffheapIncrementalIndex( + indexSchema, + bufferPool, + true, + tuningConfig.getBufferSize() + ); + } else { + return new OnheapIncrementalIndex( + indexSchema, + tuningConfig.getRowFlushBoundary() + ); + } + } + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); @@ -245,6 +282,128 @@ public class IndexGeneratorJob implements Jobby } } + public static class IndexGeneratorCombiner extends Reducer + { + private HadoopDruidIndexerConfig config; + private AggregatorFactory[] aggregators; + private AggregatorFactory[] combiningAggs; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + + aggregators = config.getSchema().getDataSchema().getAggregators(); + combiningAggs = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; ++i) { + combiningAggs[i] = aggregators[i].getCombiningFactory(); + } + } + + @Override + protected void reduce( + final BytesWritable key, Iterable values, final Context context + ) throws IOException, InterruptedException + { + + Iterator iter = values.iterator(); + BytesWritable first = iter.next(); + + if(iter.hasNext()) { + SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); + Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); + index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); + + while(iter.hasNext()) { + context.progress(); + InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); + + if(!index.canAppendRow()) { + log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); + flushIndexToContextAndClose(key, index, context); + index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); + } + + index.add(value); + } + + flushIndexToContextAndClose(key, index, context); + } else { + context.write(key, first); + } + } + + private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException + { + Iterator rows = index.iterator(); + while(rows.hasNext()) { + context.progress(); + Row row = rows.next(); + InputRow inputRow = getInputRowFromRow(row, index.getDimensions()); + context.write( + key, + new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs)) + ); + } + index.close(); + } + + private InputRow getInputRowFromRow(final Row row, final List dimensions) { + return new InputRow() + { + @Override + public List getDimensions() + { + return dimensions; + } + + @Override + public long getTimestampFromEpoch() + { + return row.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return row.getTimestamp(); + } + + @Override + public List getDimension(String dimension) + { + return row.getDimension(dimension); + } + + @Override + public Object getRaw(String dimension) + { + return row.getRaw(dimension); + } + + @Override + public float getFloatMetric(String metric) + { + return row.getFloatMetric(metric); + } + + @Override + public long getLongMetric(String metric) + { + return row.getLongMetric(metric); + } + + @Override + public int compareTo(Row o) + { + return row.compareTo(o); + } + }; + } + } + public static class IndexGeneratorPartitioner extends Partitioner implements Configurable { private Configuration config; @@ -350,7 +509,13 @@ public class IndexGeneratorJob implements Jobby * config.getSchema().getTuningConfig().getAggregationBufferRatio()); final StupidPool bufferPool = new OffheapBufferPool(aggregationBufferSize); - IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); + IncrementalIndex index = makeIncrementalIndex( + bucket, + combiningAggs, + config, + config.getSchema().getTuningConfig().isIngestOffheap(), + bufferPool + ); try { File baseFlushFile = File.createTempFile("base", "flush"); baseFlushFile.delete(); @@ -392,7 +557,13 @@ public class IndexGeneratorJob implements Jobby // close this index and make a new one, reusing same buffer index.close(); - index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); + index = makeIncrementalIndex( + bucket, + combiningAggs, + config, + config.getSchema().getTuningConfig().isIngestOffheap(), + bufferPool + ); startTime = System.currentTimeMillis(); ++indexCount; } @@ -475,31 +646,6 @@ public class IndexGeneratorJob implements Jobby index.close(); } } - - private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool) - { - final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); - final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(theBucket.time.getMillis()) - .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) - .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) - .withMetrics(aggs) - .build(); - if (tuningConfig.isIngestOffheap()) { - - return new OffheapIncrementalIndex( - indexSchema, - bufferPool, - true, - tuningConfig.getBufferSize() - ); - } else { - return new OnheapIncrementalIndex( - indexSchema, - tuningConfig.getRowFlushBoundary() - ); - } - } } public static class IndexGeneratorOutputFormat extends TextOutputFormat diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 9e95553db47..4fd7a371675 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -153,7 +153,8 @@ public class DetermineHashedPartitionsJobTest false, false, null, - null + null, + false ) ); this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 256bbee8f14..574acaa44b5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -262,7 +262,8 @@ public class DeterminePartitionsJobTest false, false, null, - null + null, + false ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index b3aca9d251b..00b7b36577b 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -197,7 +197,8 @@ public class HadoopDruidIndexerConfigTest false, false, null, - null + null, + false ) ); HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java new file mode 100644 index 00000000000..ced64faa398 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -0,0 +1,179 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.Granularity; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class IndexGeneratorCombinerTest +{ + private AggregatorFactory[] aggregators; + private IndexGeneratorJob.IndexGeneratorCombiner combiner; + + @Before + public void setUp() throws Exception + { + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + "website", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011")) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + "/tmp/dummy", + "type", + "static" + ), + null, + "/tmp/dummy" + ), + HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") + ) + ); + Configuration hadoopConfig = new Configuration(); + hadoopConfig.set( + HadoopDruidIndexerConfig.CONFIG_PROPERTY, + HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config) + ); + + Reducer.Context context = EasyMock.createMock(Reducer.Context.class); + EasyMock.expect(context.getConfiguration()).andReturn(hadoopConfig); + EasyMock.replay(context); + + aggregators = config.getSchema().getDataSchema().getAggregators(); + + combiner = new IndexGeneratorJob.IndexGeneratorCombiner(); + combiner.setup(context); + } + + @Test + public void testSingleRowNoMergePassThrough() throws Exception + { + Reducer.Context context = EasyMock.createMock(Reducer.Context.class); + Capture captureKey = Capture.newInstance(); + Capture captureVal = Capture.newInstance(); + context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal)); + EasyMock.replay(context); + + BytesWritable key = new BytesWritable("dummy_key".getBytes()); + BytesWritable val = new BytesWritable("dummy_row".getBytes()); + + combiner.reduce(key, Lists.newArrayList(val), context); + + Assert.assertTrue(captureKey.getValue() == key); + Assert.assertTrue(captureVal.getValue() == val); + } + + @Test + public void testMultipleRowsMerged() throws Exception + { + long timestamp = System.currentTimeMillis(); + + Bucket bucket = new Bucket(0, new DateTime(timestamp), 0); + SortableBytes keySortableBytes = new SortableBytes( + bucket.toGroupKey(), + new byte[0] + ); + BytesWritable key = keySortableBytes.toBytesWritable(); + + InputRow row1 = new MapBasedInputRow( + timestamp, + ImmutableList.of(), + ImmutableMap.of( + "host", "host1", + "visited", 10 + ) + ); + InputRow row2 = new MapBasedInputRow( + timestamp, + ImmutableList.of(), + ImmutableMap.of( + "host", "host2", + "visited", 5 + ) + ); + List rows = Lists.newArrayList( + new BytesWritable(InputRowSerde.toBytes(row1, aggregators)), + new BytesWritable(InputRowSerde.toBytes(row2, aggregators)) + ); + + Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); + Capture captureKey = Capture.newInstance(); + Capture captureVal = Capture.newInstance(); + context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal)); + EasyMock.replay(context); + + combiner.reduce( + key, + rows, + context + ); + + Assert.assertTrue(captureKey.getValue() == key); + + InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators); + Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum")); + Assert.assertEquals(2.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index bfaac4a465b..958b64041f1 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -35,6 +35,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.timeline.DataSegment; @@ -70,6 +71,7 @@ public class IndexGeneratorJobTest return Arrays.asList( new Object[][]{ { + false, "single", "2014-10-22T00:00:00Z/P2D", new String[][][]{ @@ -112,6 +114,7 @@ public class IndexGeneratorJobTest ) }, { + false, "hashed", "2014-10-22T00:00:00Z/P1D", new Integer[][][]{ @@ -139,7 +142,41 @@ public class IndexGeneratorJobTest "2014102213,n.example.com,234", "2014102214,o.example.com,325", "2014102215,p.example.com,3533", - "2014102216,q.example.com,587" + "2014102216,q.example.com,500", + "2014102216,q.example.com,87" + ) + }, + { + true, + "hashed", + "2014-10-22T00:00:00Z/P1D", + new Integer[][][]{ + { + { 0, 4 }, + { 1, 4 }, + { 2, 4 }, + { 3, 4 } + } + }, + ImmutableList.of( + "2014102200,a.example.com,100", + "2014102201,b.exmaple.com,50", + "2014102202,c.example.com,200", + "2014102203,d.example.com,250", + "2014102204,e.example.com,123", + "2014102205,f.example.com,567", + "2014102206,g.example.com,11", + "2014102207,h.example.com,251", + "2014102208,i.example.com,963", + "2014102209,j.example.com,333", + "2014102210,k.example.com,253", + "2014102211,l.example.com,321", + "2014102212,m.example.com,3125", + "2014102213,n.example.com,234", + "2014102214,o.example.com,325", + "2014102215,p.example.com,3533", + "2014102216,q.example.com,500", + "2014102216,q.example.com,87" ) } } @@ -156,14 +193,17 @@ public class IndexGeneratorJobTest private String partitionType; private Object[][][] shardInfoForEachSegment; private List data; + private boolean useCombiner; public IndexGeneratorJobTest( + boolean useCombiner, String partitionType, String interval, Object[][][] shardInfoForEachSegment, List data ) throws IOException { + this.useCombiner = useCombiner; this.partitionType = partitionType; this.shardInfoForEachSegment = shardInfoForEachSegment; this.interval = new Interval(interval); @@ -196,7 +236,10 @@ public class IndexGeneratorJobTest ImmutableList.of("timestamp", "host", "visited_num") ) ), - new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")}, + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_num", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) ) @@ -227,7 +270,8 @@ public class IndexGeneratorJobTest false, false, null, - null + null, + useCombiner ) ) ); @@ -325,6 +369,7 @@ public class IndexGeneratorJobTest Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); Assert.assertEquals("host", dataSegment.getDimensions().get(0)); Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); if (partitionType.equals("hashed")) { Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum]; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 8886c6e8a3c..3334a70ca89 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -111,7 +111,8 @@ public class JobHelperTest false, false, null, - null + null, + false ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 91e56ce6f9f..74fc2b69ea2 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -268,7 +268,8 @@ public class HadoopConverterJobTest false, false, null, - null + null, + false ) ) ); From 0eec1bbee2d25f4fa15667376723133013cb8ef2 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 15 Jul 2015 13:19:36 -0500 Subject: [PATCH 5/5] json serde tests for HadoopTuningConfig --- .../druid/indexer/HadoopTuningConfigTest.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java new file mode 100644 index 00000000000..e7bbd46f428 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -0,0 +1,93 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.IndexSpec; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class HadoopTuningConfigTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testSerde() throws Exception + { + HadoopTuningConfig expected = new HadoopTuningConfig( + "/tmp/workingpath", + "version", + null, + null, + null, + 100, + true, + true, + true, + true, + null, + true, + true, + true, + 200, + 0.1f, + true + ); + + HadoopTuningConfig actual = jsonReadWriteRead(jsonMapper.writeValueAsString(expected), HadoopTuningConfig.class); + + Assert.assertEquals("/tmp/workingpath", actual.getWorkingPath()); + Assert.assertEquals("version", actual.getVersion()); + Assert.assertNotNull(actual.getPartitionsSpec()); + Assert.assertEquals(ImmutableMap.>of(), actual.getShardSpecs()); + Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); + Assert.assertEquals(100, actual.getRowFlushBoundary()); + Assert.assertEquals(true, actual.isLeaveIntermediate()); + Assert.assertEquals(true, actual.isCleanupOnFailure()); + Assert.assertEquals(true, actual.isOverwriteFiles()); + Assert.assertEquals(true, actual.isIgnoreInvalidRows()); + Assert.assertEquals(ImmutableMap.of(), actual.getJobProperties()); + Assert.assertEquals(true, actual.isCombineText()); + Assert.assertEquals(true, actual.isPersistInHeap()); + Assert.assertEquals(true, actual.isIngestOffheap()); + Assert.assertEquals(200, actual.getBufferSize()); + Assert.assertEquals(0.1f, actual.getAggregationBufferRatio(), 0.0001); + Assert.assertEquals(true, actual.getUseCombiner()); + + } + + public static T jsonReadWriteRead(String s, Class klass) + { + try { + return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +}