Use timestamp in millis as Map key instead of DateTime object (#3674)

* Use Long timestamp as key instead of DateTime.

DateTime representation is screwed up when you store with an obj
and read with a different DateTime obj.

For example: The code below fails when you use DateTime as key
```
        DateTime odt = DateTime.now(DateTimeUtils.getZone(DateTimeZone.forID("America/Los_Angeles")));
        HashMap<DateTime, String> map = new HashMap<>();
        map.put(odt, "abc");
        DateTime dt = new DateTime(odt.getMillis());
        System.out.println(map.get(dt));
```

* Respect timezone when creating the file.

* Update docs with timezone caveat in granularity spec

* Remove unused imports
This commit is contained in:
praveev 2016-11-11 10:20:20 -08:00 committed by Fangjin Yang
parent fd5451486c
commit 52a74cf84f
13 changed files with 51 additions and 50 deletions

View File

@ -188,7 +188,7 @@ This spec is used to generated segments with uniform intervals.
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC')
| timezone | string | The timezone to represent the interval offsets in. Only valid if intervals are explicitly specified for batch ingestion. Will not be valid for kafka based ingestion. | no (default == 'UTC')
### Arbitrary Granularity Spec
@ -200,7 +200,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC')
| timezone | string | The timezone to represent the interval offsets in. Only valid if intervals are explicitly specified for batch ingestion. Will not be valid for kafka based ingestion. | no (default == 'UTC')
# IO Config

View File

@ -302,11 +302,11 @@ public class OrcIndexGeneratorJobTest
}
}
private Map<DateTime, List<HadoopyShardSpec>> loadShardSpecs(
private Map<Long, List<HadoopyShardSpec>> loadShardSpecs(
Integer[][][] shardInfoForEachShard
)
{
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
int segmentNum = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
@ -319,7 +319,7 @@ public class OrcIndexGeneratorJobTest
actualSpecs.add(new HadoopyShardSpec(spec, shardCount++));
}
shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
}
return shardSpecs;

View File

@ -149,7 +149,7 @@ public class DetermineHashedPartitionsJob implements Jobby
);
log.info("Determined Intervals for Job [%s].", config.getSegmentGranularIntervals());
}
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
@ -191,7 +191,7 @@ public class DetermineHashedPartitionsJob implements Jobby
}
}
shardSpecs.put(bucket, actualSpecs);
shardSpecs.put(bucket.getMillis(), actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
@ -323,7 +323,12 @@ public class DetermineHashedPartitionsJob implements Jobby
HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()))
);
}
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
Optional<Interval> intervalOptional = config.getGranularitySpec().bucketInterval(new DateTime(key.get()));
if (!intervalOptional.isPresent()) {
throw new ISE("WTF?! No bucket found for timestamp: %s", key.get());
}
Interval interval = intervalOptional.get();
intervals.add(interval);
final Path outPath = config.makeSegmentPartitionInfoPath(interval);
final OutputStream out = Utils.makePathAndOutputStream(

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.io.IOException;
@ -218,7 +217,7 @@ public class DeterminePartitionsJob implements Jobby
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null;
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap();
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
@ -238,7 +237,7 @@ public class DeterminePartitionsJob implements Jobby
log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
}
shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
@ -370,17 +369,17 @@ public class DeterminePartitionsJob implements Jobby
{
private final HadoopDruidIndexerConfig config;
private final String partitionDimension;
private final Map<DateTime, Integer> intervalIndexes;
private final Map<Long, Integer> intervalIndexes;
public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
{
this.config = config;
this.partitionDimension = partitionDimension;
final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0;
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) {
timeIndexBuilder.put(bucketInterval.getStart(), idx);
timeIndexBuilder.put(bucketInterval.getStartMillis(), idx);
idx++;
}
@ -400,7 +399,7 @@ public class DeterminePartitionsJob implements Jobby
}
final Interval interval = maybeInterval.get();
final int intervalIndex = intervalIndexes.get(interval.getStart());
final int intervalIndex = intervalIndexes.get(interval.getStartMillis());
final ByteBuffer buf = ByteBuffer.allocate(4 + 8);
buf.putInt(intervalIndex);

View File

@ -27,7 +27,6 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.util.List;
@ -59,7 +58,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else {
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap();
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
@ -78,11 +77,11 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
)
);
}
shardSpecs.put(bucket, specs);
shardSpecs.put(bucket.getMillis(), specs);
log.info("DateTime[%s], spec[%s]", bucket, specs);
} else {
final HadoopyShardSpec spec = new HadoopyShardSpec(NoneShardSpec.instance(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
shardSpecs.put(bucket.getMillis(), Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
}

View File

@ -215,8 +215,8 @@ public class HadoopDruidIndexerConfig
private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec;
private final Map<DateTime, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<DateTime, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final Map<Long, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final QueryGranularity rollupGran;
@JsonCreator
@ -226,7 +226,7 @@ public class HadoopDruidIndexerConfig
{
this.schema = spec;
this.pathSpec = JSON_MAPPER.convertValue(spec.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : spec.getTuningConfig().getShardSpecs().entrySet()) {
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : spec.getTuningConfig().getShardSpecs().entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
continue;
}
@ -310,7 +310,7 @@ public class HadoopDruidIndexerConfig
this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public void setShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
@ -363,12 +363,12 @@ public class HadoopDruidIndexerConfig
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).get(bucket.partitionNum);
}
public int getShardSpecCount(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time).size();
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size();
}
public boolean isBuildV9Directly()
@ -411,12 +411,12 @@ public class HadoopDruidIndexerConfig
return Optional.absent();
}
final DateTime bucketStart = timeBucket.get().getStart();
final ShardSpec actualSpec = shardSpecLookups.get(bucketStart)
final ShardSpec actualSpec = shardSpecLookups.get(bucketStart.getMillis())
.getShardSpec(
rollupGran.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(bucketStart).get(actualSpec);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(bucketStart.getMillis()).get(actualSpec);
return Optional.of(
new Bucket(
@ -452,7 +452,7 @@ public class HadoopDruidIndexerConfig
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime);
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}

View File

@ -39,7 +39,7 @@ import java.util.Map;
public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final Map<Long, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000;
private static final boolean DEFAULT_USE_COMBINER = false;
@ -73,7 +73,7 @@ public class HadoopTuningConfig implements TuningConfig
private final String workingPath;
private final String version;
private final PartitionsSpec partitionsSpec;
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final int rowFlushBoundary;
private final boolean leaveIntermediate;
@ -93,7 +93,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("shardSpecs") Map<Long, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") IndexSpec indexSpec,
final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
@ -156,7 +156,7 @@ public class HadoopTuningConfig implements TuningConfig
}
@JsonProperty
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
public Map<Long, List<HadoopyShardSpec>> getShardSpecs()
{
return shardSpecs;
}
@ -287,7 +287,7 @@ public class HadoopTuningConfig implements TuningConfig
);
}
public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> specs)
public HadoopTuningConfig withShardSpecs(Map<Long, List<HadoopyShardSpec>> specs)
{
return new HadoopTuningConfig(
workingPath,

View File

@ -74,7 +74,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
final Granularity segmentGranularity = config.getGranularitySpec().getSegmentGranularity();
Map<DateTime, Long> inputModifiedTimes = new TreeMap<>(
Map<Long, Long> inputModifiedTimes = new TreeMap<>(
Comparators.inverse(Comparators.comparable())
);
@ -83,12 +83,12 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
final Long currVal = inputModifiedTimes.get(key);
final long mTime = status.getModificationTime();
inputModifiedTimes.put(key, currVal == null ? mTime : Math.max(currVal, mTime));
inputModifiedTimes.put(key.getMillis(), currVal == null ? mTime : Math.max(currVal, mTime));
}
Set<Interval> bucketsToRun = Sets.newTreeSet(Comparators.intervals());
for (Map.Entry<DateTime, Long> entry : inputModifiedTimes.entrySet()) {
DateTime timeBucket = entry.getKey();
for (Map.Entry<Long, Long> entry : inputModifiedTimes.entrySet()) {
DateTime timeBucket = new DateTime(entry.getKey());
long mTime = entry.getValue();
String bucketOutput = String.format(

View File

@ -392,8 +392,8 @@ public class BatchDeltaIngestionTest
);
config.setShardSpecs(
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(
INTERVAL_FULL.getStart(),
ImmutableMap.<Long, List<HadoopyShardSpec>>of(
INTERVAL_FULL.getStartMillis(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),

View File

@ -33,7 +33,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -175,13 +174,13 @@ public class DetermineHashedPartitionsJobTest
public void testDetermineHashedPartitions(){
DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig);
determineHashedPartitionsJob.run();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs();
Map<Long, List<HadoopyShardSpec>> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs();
Assert.assertEquals(
expectedNumTimeBuckets,
shardSpecs.entrySet().size()
);
int i=0;
for(Map.Entry<DateTime, List<HadoopyShardSpec>> entry : shardSpecs.entrySet()) {
for(Map.Entry<Long, List<HadoopyShardSpec>> entry : shardSpecs.entrySet()) {
Assert.assertEquals(
expectedNumOfShards[i++],
entry.getValue().size(),

View File

@ -35,7 +35,6 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@ -285,7 +284,7 @@ public class DeterminePartitionsJobTest
int segmentNum = 0;
Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : config.getSchema()
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : config.getSchema()
.getTuningConfig()
.getShardSpecs()
.entrySet()) {

View File

@ -214,7 +214,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
ImmutableMap.of(new DateTime("2010-01-01T01:00:00").getMillis(), specs),
null,
null,
false,
@ -276,12 +276,12 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(new DateTime("2010-01-01T01:00:00"),
ImmutableMap.<Long, List<HadoopyShardSpec>>of(new DateTime("2010-01-01T01:00:00").getMillis(),
Lists.newArrayList(new HadoopyShardSpec(
NoneShardSpec.instance(),
1
)),
new DateTime("2010-01-01T02:00:00"),
new DateTime("2010-01-01T02:00:00").getMillis(),
Lists.newArrayList(new HadoopyShardSpec(
NoneShardSpec.instance(),
2

View File

@ -546,12 +546,12 @@ public class IndexGeneratorJobTest
return specs;
}
private Map<DateTime, List<HadoopyShardSpec>> loadShardSpecs(
private Map<Long, List<HadoopyShardSpec>> loadShardSpecs(
String partitionType,
Object[][][] shardInfoForEachShard
)
{
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
int segmentNum = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
@ -561,7 +561,7 @@ public class IndexGeneratorJobTest
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
}
shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
}
return shardSpecs;