added contextual time parse (#2867)

This commit is contained in:
John Wang 2016-04-25 13:35:10 -07:00 committed by Xavier Léauté
parent 55785267e4
commit 5658bd99eb
3 changed files with 160 additions and 6 deletions

View File

@ -20,7 +20,6 @@ package io.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.metamx.common.parsers.ParserUtils;
import com.metamx.common.parsers.TimestampParser;
import org.joda.time.DateTime;
@ -30,6 +29,12 @@ import java.util.Map;
*/
public class TimestampSpec
{
private static class ParseCtx
{
Object lastTimeObject = null;
DateTime lastDateTime = null;
}
private static final String DEFAULT_COLUMN = "timestamp";
private static final String DEFAULT_FORMAT = "auto";
private static final DateTime DEFAULT_MISSING_VALUE = null;
@ -40,6 +45,9 @@ public class TimestampSpec
// this value should never be set for production data
private final DateTime missingValue;
// remember last value parsed
private ParseCtx parseCtx = new ParseCtx();
@JsonCreator
public TimestampSpec(
@JsonProperty("column") String timestampColumn,
@ -52,8 +60,8 @@ public class TimestampSpec
this.timestampFormat = format == null ? DEFAULT_FORMAT : format;
this.timestampConverter = TimestampParser.createObjectTimestampParser(timestampFormat);
this.missingValue = missingValue == null
? DEFAULT_MISSING_VALUE
: missingValue;
? DEFAULT_MISSING_VALUE
: missingValue;
}
@JsonProperty("column")
@ -77,8 +85,19 @@ public class TimestampSpec
public DateTime extractTimestamp(Map<String, Object> input)
{
final Object o = input.get(timestampColumn);
return o == null ? missingValue : timestampConverter.apply(o);
DateTime extracted = missingValue;
if (o != null) {
if (o.equals(parseCtx.lastTimeObject)) {
extracted = parseCtx.lastDateTime;
} else {
ParseCtx newCtx = new ParseCtx();
newCtx.lastTimeObject = o;
extracted = timestampConverter.apply(o);
newCtx.lastDateTime = extracted;
parseCtx = newCtx;
}
}
return extracted;
}
@Override

View File

@ -20,8 +20,9 @@
package io.druid.data.input.impl;
import com.google.common.collect.ImmutableMap;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert;
import org.junit.Test;
public class TimestampSpecTest
@ -45,4 +46,25 @@ public class TimestampSpecTest
spec.extractTimestamp(ImmutableMap.<String, Object>of("dim", "foo"))
);
}
@Test
public void testContextualTimestampList() throws Exception
{
String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
String[] dates = new String[]{
"2000-01-01T05:00:00",
"2000-01-01T05:00:01",
"2000-01-01T05:00:01",
"2000-01-01T05:00:02",
"2000-01-01T05:00:03",
};
TimestampSpec spec = new TimestampSpec("TIMEstamp", DATE_FORMAT, null);
for (int i = 0; i < dates.length; ++i) {
String date = dates[i];
DateTime dateTime = spec.extractTimestamp(ImmutableMap.<String, Object>of("TIMEstamp", date));
DateTime expectedDateTime = ISODateTimeFormat.dateHourMinuteSecond().parseDateTime(date);
Assert.assertEquals(expectedDateTime, dateTime);
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import com.google.common.base.Function;
import com.metamx.common.parsers.TimestampParser;
import org.joda.time.DateTime;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
public class TimeParseBenchmark
{
// 1 million rows
int numRows = 1000000;
// Number of batches of same times
@Param({"10000", "100000", "500000", "1000000"})
int numBatches;
static final String DATA_FORMAT = "MM/dd/yyyy HH:mm:ss Z";
static Function<String, DateTime> timeFn = TimestampParser.createTimestampParser(DATA_FORMAT);
private String[] rows;
@Setup
public void setup()
{
SimpleDateFormat format = new SimpleDateFormat(DATA_FORMAT);
long start = System.currentTimeMillis();
int rowsPerBatch = numRows / numBatches;
int numRowInBatch = 0;
rows = new String[numRows];
for (int i = 0; i < numRows; ++i) {
if (numRowInBatch >= rowsPerBatch) {
numRowInBatch = 0;
start += 5000; // new batch, add 5 seconds
}
rows[i] = format.format(new Date(start));
numRowInBatch++;
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public void parseNoContext(Blackhole blackhole)
{
for (int i = 0; i < rows.length; ++i) {
blackhole.consume(timeFn.apply(rows[i]).getMillis());
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public void parseWithContext(Blackhole blackhole)
{
String lastTimeString = null;
long lastTime = 0L;
for (int i = 0; i < rows.length; ++i) {
if (!rows[i].equals(lastTimeString)) {
lastTimeString = rows[i];
lastTime = timeFn.apply(rows[i]).getMillis();
}
blackhole.consume(lastTime);
}
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(TimeParseBenchmark.class.getSimpleName())
.warmupIterations(1)
.measurementIterations(10)
.forks(1)
.build();
new Runner(opt).run();
}
}