Merge pull request #1097 from metamx/better-hadoop-sort-key

Sort HadoopIndexer rows by time+dim bucket to help reduce spilling
This commit is contained in:
Fangjin Yang 2015-02-25 12:49:58 -08:00
commit 6424815f88
2 changed files with 27 additions and 4 deletions

View File

@ -32,6 +32,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
{
private HadoopDruidIndexerConfig config;
private StringInputRowParser parser;
protected GranularitySpec granularitySpec;
@Override
protected void setup(Context context)
@ -39,6 +40,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
granularitySpec = config.getGranularitySpec();
}
public HadoopDruidIndexerConfig getConfig()
@ -69,9 +71,10 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
throw e;
}
}
GranularitySpec spec = config.getGranularitySpec();
if (!spec.bucketIntervals().isPresent() || spec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
.isPresent()) {
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, value, context);
}
}

View File

@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
@ -34,7 +36,9 @@ import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
@ -211,6 +215,8 @@ public class IndexGeneratorJob implements Jobby
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{
private static final HashFunction hashFunction = Hashing.murmur3_128();
@Override
protected void innerMap(
InputRow inputRow,
@ -225,10 +231,24 @@ public class IndexGeneratorJob implements Jobby
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}
final long truncatedTimestamp = granularitySpec.getQueryGranularity().truncate(inputRow.getTimestampFromEpoch());
final byte[] hashedDimensions = hashFunction.hashBytes(
HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(
Rows.toGroupKey(
truncatedTimestamp,
inputRow
)
)
).asBytes();
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
Longs.toByteArray(inputRow.getTimestampFromEpoch())
// sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side
ByteBuffer.allocate(Longs.BYTES + hashedDimensions.length)
.putLong(truncatedTimestamp)
.put(hashedDimensions)
.array()
).toBytesWritable(),
text
);