review comments

This commit is contained in:
nishantmonu51 2014-01-31 20:19:33 +05:30
parent 97e5d68635
commit 82b748ad43
8 changed files with 98 additions and 109 deletions

View File

@ -62,31 +62,22 @@ import java.util.Map;
import java.util.Set;
/**
* Determines appropriate ShardSpecs for a job by determining approximate cardinality of data set
* Determines appropriate ShardSpecs for a job by determining approximate cardinality of data set using HyperLogLog
*/
public class DeterminePartitionsUsingCardinalityJob implements Jobby
public class DetermineHashedPartitionsJob implements Jobby
{
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DeterminePartitionsUsingCardinalityJob.class);
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private final HadoopDruidIndexerConfig config;
private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
public DeterminePartitionsUsingCardinalityJob(
public DetermineHashedPartitionsJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
}
public static void injectSystemProperties(Job job)
{
final Configuration conf = job.getConfiguration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
}
public boolean run()
{
try {
@ -97,10 +88,10 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
long startTime = System.currentTimeMillis();
final Job groupByJob = new Job(
new Configuration(),
String.format("%s-determine_cardinality_grouped-%s", config.getDataSource(), config.getIntervals())
String.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
);
injectSystemProperties(groupByJob);
JobHelper.injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
@ -145,6 +136,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals());
}
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart();
@ -161,14 +153,17 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
if (numberOfShards > MAX_SHARDS) {
numberOfShards = MAX_SHARDS;
throw new ISE(
"Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
numberOfShards,
MAX_SHARDS
);
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
if (numberOfShards == 1) {
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), 0));
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else {
int shardCount = 0;
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
@ -183,9 +178,8 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
}
config.setShardSpecs(shardSpecs);
log.info(
"Determine partitions Using cardinality took %d millis shardSpecs %s",
(System.currentTimeMillis() - startTime),
shardSpecs
"DetermineHashedPartitionsJob took %d millis",
(System.currentTimeMillis() - startTime)
);
return true;
@ -218,7 +212,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder();
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) {
builder.put(bucketInterval, new HyperLogLog(20));
builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
}
hyperLogLogs = builder.build();
}
@ -249,7 +243,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
hyperLogLogs.put(interval, new HyperLogLog(20));
hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
}
} else {
final Optional<Interval> maybeInterval = config.getGranularitySpec()
@ -307,7 +301,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby
Context context
) throws IOException, InterruptedException
{
HyperLogLog aggregate = new HyperLogLog(20);
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes());
try {

View File

@ -107,16 +107,6 @@ public class DeterminePartitionsJob implements Jobby
this.config = config;
}
public static void injectSystemProperties(Job job)
{
final Configuration conf = job.getConfiguration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
}
public boolean run()
{
try {
@ -131,7 +121,7 @@ public class DeterminePartitionsJob implements Jobby
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
);
injectSystemProperties(groupByJob);
JobHelper.injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
@ -168,7 +158,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(dimSelectionJob);
JobHelper.injectSystemProperties(dimSelectionJob);
if (!config.getPartitionsSpec().isAssumeGrouped()) {
// Read grouped data from the groupByJob.

View File

@ -74,7 +74,7 @@ public class HadoopDruidIndexerJob implements Jobby
if (config.isDeterminingPartitions()) {
if(config.getPartitionDimension() == null){
jobs.add(new DeterminePartitionsUsingCardinalityJob(config));
jobs.add(new DetermineHashedPartitionsJob(config));
} else {
jobs.add(new DeterminePartitionsJob(config));
}
@ -143,12 +143,7 @@ public class HadoopDruidIndexerJob implements Jobby
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
for (String propName : System.getProperties().stringPropertyNames()) {
Configuration conf = job.getConfiguration();
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
JobHelper.injectSystemProperties(job);
config.addInputPaths(job);
}

View File

@ -84,9 +84,7 @@ import java.util.zip.ZipOutputStream;
public class IndexGeneratorJob implements Jobby
{
private static final Logger log = new Logger(IndexGeneratorJob.class);
private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
@ -97,6 +95,39 @@ public class IndexGeneratorJob implements Jobby
this.jobStats = new IndexGeneratorStats();
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
publishedSegmentsBuilder.add(segment);
log.info("Adding segment %s to the list of published segments", segment.getIdentifier());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
return publishedSegments;
}
public IndexGeneratorStats getJobStats()
{
return jobStats;
@ -112,12 +143,7 @@ public class IndexGeneratorJob implements Jobby
job.getConfiguration().set("io.sort.record.percent", "0.23");
for (String propName : System.getProperties().stringPropertyNames()) {
Configuration conf = job.getConfiguration();
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
JobHelper.injectSystemProperties(job);
job.setInputFormatClass(TextInputFormat.class);
@ -156,39 +182,6 @@ public class IndexGeneratorJob implements Jobby
}
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
publishedSegmentsBuilder.add(segment);
log.info("Adding segment %s to the list of published segments", segment.getIdentifier());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
return publishedSegments;
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{
@ -225,7 +218,15 @@ public class IndexGeneratorJob implements Jobby
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
int shardNum = bytes.getInt();
if (System.getProperty("mapred.job.tracker").equals("local")) {
return shardNum % numPartitions;
} else {
if (shardNum >= numPartitions) {
throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
}
return shardNum;
}
}
}

View File

@ -94,4 +94,14 @@ public class JobHelper
}
}
}
public static void injectSystemProperties(Job job)
{
final Configuration conf = job.getConfiguration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
}
}

View File

@ -39,9 +39,9 @@ import java.util.Set;
public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
private static HashFunction hashFunction = null;
private static final HashFunction hashFunction = Hashing.murmur3_32();
@JacksonInject
public ObjectMapper jsonMapper;
private ObjectMapper jsonMapper;
@JsonCreator
public HashBasedNumberedShardSpec(
@ -50,7 +50,6 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
)
{
super(partitionNum, partitions);
hashFunction = Hashing.murmur3_32();
}
@Override

View File

@ -19,8 +19,12 @@
package io.druid;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.ISE;
import io.druid.guice.ServerModule;
import io.druid.jackson.DefaultObjectMapper;
@ -37,5 +41,20 @@ public class TestUtil
for (Module module : list) {
MAPPER.registerModule(module);
}
MAPPER.setInjectableValues(
new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) {
return TestUtil.MAPPER;
}
throw new ISE("No Injectable value found");
}
}
);
}
}

View File

@ -38,25 +38,6 @@ import java.util.List;
public class HashBasedNumberedShardSpecTest
{
@Before
public void setup()
{
TestUtil.MAPPER.setInjectableValues(
new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) {
return TestUtil.MAPPER;
}
throw new ISE("No Injectable value found");
}
}
);
}
@Test
public void testSerdeRoundTrip() throws Exception