diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index f4e96b5cc05..e217b82ee3e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -235,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 62dd90470ee..a21e5f7437b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -259,7 +259,7 @@ public class DeterminePartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { @@ -340,7 +340,7 @@ public class DeterminePartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { @@ -378,7 +378,7 @@ public class DeterminePartitionsJob implements Jobby } public void emitDimValueCounts( - TaskInputOutputContext context, + TaskInputOutputContext context, DateTime timestamp, Map> dims ) throws IOException, InterruptedException @@ -891,7 +891,7 @@ public class DeterminePartitionsJob implements Jobby } private static void write( - TaskInputOutputContext context, + TaskInputOutputContext context, final byte[] groupKey, DimValueCount dimValueCount ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index bd561106f70..e90a0c59018 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -41,11 +41,11 @@ import org.joda.time.DateTime; import com.metamx.common.RE; -public abstract class HadoopDruidIndexerMapper extends Mapper +public abstract class HadoopDruidIndexerMapper extends Mapper { private static final Logger log = new Logger(HadoopDruidIndexerMapper.class); - private HadoopDruidIndexerConfig config; + protected HadoopDruidIndexerConfig config; private InputRowParser parser; protected GranularitySpec granularitySpec; @@ -70,7 +70,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< @Override protected void map( - Writable key, Writable value, Context context + Object key, Object value, Context context ) throws IOException, InterruptedException { try { @@ -99,7 +99,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - public final static InputRow parseInputRow(Writable value, InputRowParser parser) + public final static InputRow parseInputRow(Object value, InputRowParser parser) { if(parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before @@ -109,7 +109,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - abstract protected void innerMap(InputRow inputRow, Writable value, Context context) + abstract protected void innerMap(InputRow inputRow, Object value, Context context) throws IOException, InterruptedException; } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index f69af4aa27e..a8b7ed14567 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -35,7 +35,6 @@ import com.metamx.common.parsers.ParseException; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; -import io.druid.data.input.impl.InputRowParser; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; @@ -141,7 +140,7 @@ public class IndexGeneratorJob implements Jobby JobHelper.setInputFormat(job, config); job.setMapperClass(IndexGeneratorMapper.class); - job.setMapOutputValueClass(Text.class); + job.setMapOutputValueClass(BytesWritable.class); SortableBytes.useSortableBytesAsMapOutputKey(job); @@ -149,6 +148,7 @@ public class IndexGeneratorJob implements Jobby if (numReducers == 0) { throw new RuntimeException("No buckets?? seems there is no data to index."); } + job.setNumReduceTasks(numReducers); job.setPartitionerClass(IndexGeneratorPartitioner.class); @@ -193,14 +193,24 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); + private AggregatorFactory[] aggregators; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + super.setup(context); + aggregators = config.getSchema().getDataSchema().getAggregators(); + } + @Override protected void innerMap( InputRow inputRow, - Writable value, + Object value, Context context ) throws IOException, InterruptedException { @@ -230,7 +240,7 @@ public class IndexGeneratorJob implements Jobby .put(hashedDimensions) .array() ).toBytesWritable(), - value + new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators)) ); } } @@ -269,11 +279,12 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorReducer extends Reducer + public static class IndexGeneratorReducer extends Reducer { protected HadoopDruidIndexerConfig config; private List metricNames = Lists.newArrayList(); - private InputRowParser parser; + private AggregatorFactory[] aggregators; + private AggregatorFactory[] combiningAggs; protected ProgressIndicator makeProgressIndicator(final Context context) { @@ -317,29 +328,29 @@ public class IndexGeneratorJob implements Jobby { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) { - metricNames.add(factory.getName()); + aggregators = config.getSchema().getDataSchema().getAggregators(); + combiningAggs = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; ++i) { + metricNames.add(aggregators[i].getName()); + combiningAggs[i] = aggregators[i].getCombiningFactory(); } - - parser = config.getParser(); } @Override protected void reduce( - BytesWritable key, Iterable values, final Context context + BytesWritable key, Iterable values, final Context context ) throws IOException, InterruptedException { SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); - final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize(); final int aggregationBufferSize = (int) ((double) maxTotalBufferSize * config.getSchema().getTuningConfig().getAggregationBufferRatio()); final StupidPool bufferPool = new OffheapBufferPool(aggregationBufferSize); - IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool); + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); try { File baseFlushFile = File.createTempFile("base", "flush"); baseFlushFile.delete(); @@ -354,24 +365,13 @@ public class IndexGeneratorJob implements Jobby Set allDimensionNames = Sets.newHashSet(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); - for (final Writable value : values) { + for (final BytesWritable bw : values) { context.progress(); - int numRows; - try { - final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); - allDimensionNames.addAll(inputRow.getDimensions()); - numRows = index.add(inputRow); - } - catch (ParseException e) { - if (config.isIgnoreInvalidRows()) { - log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - continue; - } else { - throw e; - } - } + final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators)); + allDimensionNames.addAll(inputRow.getDimensions()); + int numRows = index.add(inputRow); + ++lineCount; if (!index.canAppendRow()) { @@ -391,8 +391,8 @@ public class IndexGeneratorJob implements Jobby persist(index, interval, file, progressIndicator); // close this index and make a new one, reusing same buffer index.close(); - index = makeIncrementalIndex(bucket, aggs, bufferPool); + index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); startTime = System.currentTimeMillis(); ++indexCount; } @@ -421,7 +421,7 @@ public class IndexGeneratorJob implements Jobby indexes.add(IndexIO.loadIndex(file)); } mergedBase = mergeQueryableIndex( - indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator + indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator ); } final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())