more logging for determine hashed

This commit is contained in:
fjy 2014-05-30 16:19:20 -07:00
parent ee60183df7
commit 4c13327297
3 changed files with 31 additions and 15 deletions

View File

@ -158,22 +158,27 @@ public class DetermineHashedPartitionsJob implements Jobby
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
} }
if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) { if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) {
Long cardinality = config.jsonMapper.readValue( final Long numRows = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>() Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{ {
} }
); );
final int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
log.info("Found approximately [%,d] rows in data.", numRows);
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
if (numberOfShards == 1) { final int numberOfShards = (int) Math.ceil((double) numRows / config.getTargetPartitionSize());
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else { log.info("Creating [%,d] shards", numberOfShards);
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); if (numberOfShards == 1) {
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else {
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
} }
}
shardSpecs.put(bucket, actualSpecs); shardSpecs.put(bucket, actualSpecs);

View File

@ -37,6 +37,7 @@ import com.google.inject.Module;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.TimestampParser;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -34,6 +35,7 @@ import com.google.common.primitives.Ints;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.TimestampParser;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -551,4 +553,12 @@ public class IndexTask extends AbstractFixedIntervalTask
return rowFlushBoundary; return rowFlushBoundary;
} }
} }
public static void main(String[] args)
{
Function<String, DateTime> parser = TimestampParser.createTimestampParser("millis");
parser.apply("1401266370985");
}
} }