take control of InputRow serde between Mapper/Reducer in Hadoop Indexing

This allows for arbitrary InputFormat while hadoop batch ingestion that
can return records of value type other than Text
This commit is contained in:
Himanshu Gupta 2015-07-15 13:00:57 -05:00
parent f7a92db332
commit 4ef484048a
4 changed files with 42 additions and 42 deletions

View File

@ -235,7 +235,7 @@ public class DetermineHashedPartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{

View File

@ -259,7 +259,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
@ -340,7 +340,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
@ -378,7 +378,7 @@ public class DeterminePartitionsJob implements Jobby
}
public void emitDimValueCounts(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
DateTime timestamp,
Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
@ -891,7 +891,7 @@ public class DeterminePartitionsJob implements Jobby
}
private static void write(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
final byte[] groupKey,
DimValueCount dimValueCount
)

View File

@ -41,11 +41,11 @@ import org.joda.time.DateTime;
import com.metamx.common.RE;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
{
private static final Logger log = new Logger(HadoopDruidIndexerMapper.class);
private HadoopDruidIndexerConfig config;
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
@ -70,7 +70,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
@Override
protected void map(
Writable key, Writable value, Context context
Object key, Object value, Context context
) throws IOException, InterruptedException
{
try {
@ -99,7 +99,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
public final static InputRow parseInputRow(Writable value, InputRowParser parser)
public final static InputRow parseInputRow(Object value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
@ -109,7 +109,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
abstract protected void innerMap(InputRow inputRow, Writable value, Context context)
abstract protected void innerMap(InputRow inputRow, Object value, Context context)
throws IOException, InterruptedException;
}

View File

@ -35,7 +35,6 @@ import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
@ -141,7 +140,7 @@ public class IndexGeneratorJob implements Jobby
JobHelper.setInputFormat(job, config);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
SortableBytes.useSortableBytesAsMapOutputKey(job);
@ -149,6 +148,7 @@ public class IndexGeneratorJob implements Jobby
if (numReducers == 0) {
throw new RuntimeException("No buckets?? seems there is no data to index.");
}
job.setNumReduceTasks(numReducers);
job.setPartitionerClass(IndexGeneratorPartitioner.class);
@ -193,14 +193,24 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Writable>
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
{
private static final HashFunction hashFunction = Hashing.murmur3_128();
private AggregatorFactory[] aggregators;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
}
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
@ -230,7 +240,7 @@ public class IndexGeneratorJob implements Jobby
.put(hashedDimensions)
.array()
).toBytesWritable(),
value
new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators))
);
}
}
@ -269,11 +279,12 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Writable, BytesWritable, Text>
public static class IndexGeneratorReducer extends Reducer<BytesWritable, BytesWritable, BytesWritable, Text>
{
protected HadoopDruidIndexerConfig config;
private List<String> metricNames = Lists.newArrayList();
private InputRowParser parser;
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
protected ProgressIndicator makeProgressIndicator(final Context context)
{
@ -317,29 +328,29 @@ public class IndexGeneratorJob implements Jobby
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) {
metricNames.add(factory.getName());
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
metricNames.add(aggregators[i].getName());
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
parser = config.getParser();
}
@Override
protected void reduce(
BytesWritable key, Iterable<Writable> values, final Context context
BytesWritable key, Iterable<BytesWritable> values, final Context context
) throws IOException, InterruptedException
{
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize();
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
* config.getSchema().getTuningConfig().getAggregationBufferRatio());
final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
IncrementalIndex index = makeIncrementalIndex(bucket, aggs, bufferPool);
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, bufferPool);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
@ -354,24 +365,13 @@ public class IndexGeneratorJob implements Jobby
Set<String> allDimensionNames = Sets.newHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
for (final Writable value : values) {
for (final BytesWritable bw : values) {
context.progress();
int numRows;
try {
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser));
allDimensionNames.addAll(inputRow.getDimensions());
numRows = index.add(inputRow);
}
catch (ParseException e) {
if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
continue;
} else {
throw e;
}
}
final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
++lineCount;
if (!index.canAppendRow()) {
@ -391,8 +391,8 @@ public class IndexGeneratorJob implements Jobby
persist(index, interval, file, progressIndicator);
// close this index and make a new one, reusing same buffer
index.close();
index = makeIncrementalIndex(bucket, aggs, bufferPool);
index = makeIncrementalIndex(bucket, combiningAggs, bufferPool);
startTime = System.currentTimeMillis();
++indexCount;
}
@ -421,7 +421,7 @@ public class IndexGeneratorJob implements Jobby
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
);
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())