enable tb to return just max or min time

This commit is contained in:
fjy 2014-06-16 13:45:32 -07:00
parent 0fd57dbc75
commit d4a47fe6e8
5 changed files with 145 additions and 32 deletions

View File

@ -692,12 +692,14 @@ public class Druids
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private String exclude;
private Map<String, Object> context;
public TimeBoundaryQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
exclude = null;
context = null;
}
@ -706,6 +708,7 @@ public class Druids
return new TimeBoundaryQuery(
dataSource,
querySegmentSpec,
exclude,
context
);
}
@ -715,6 +718,7 @@ public class Druids
return new TimeBoundaryQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.exclude(builder.exclude)
.context(builder.context);
}
@ -748,6 +752,12 @@ public class Druids
return this;
}
public TimeBoundaryQueryBuilder exclude(String ex)
{
exclude = ex;
return this;
}
public TimeBoundaryQueryBuilder context(Map<String, Object> c)
{
context = c;

View File

@ -50,10 +50,13 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
public static final String MIN_TIME = "minTime";
private static final byte CACHE_TYPE_ID = 0x0;
private final String exclude;
@JsonCreator
public TimeBoundaryQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("exclude") String exclude,
@JsonProperty("context") Map<String, Object> context
)
{
@ -63,6 +66,8 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
: querySegmentSpec,
context
);
this.exclude = exclude == null ? "" : exclude;
}
@Override
@ -77,12 +82,19 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return Query.TIME_BOUNDARY;
}
@JsonProperty
public String getExclude()
{
return exclude;
}
@Override
public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TimeBoundaryQuery(
getDataSource(),
getQuerySegmentSpec(),
exclude,
computeOverridenContext(contextOverrides)
);
}
@ -93,6 +105,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return new TimeBoundaryQuery(
getDataSource(),
spec,
exclude,
getContext()
);
}
@ -103,14 +116,17 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
return new TimeBoundaryQuery(
dataSource,
getQuerySegmentSpec(),
exclude,
getContext()
);
}
public byte[] getCacheKey()
{
return ByteBuffer.allocate(1)
final byte[] excludeBytes = exclude.getBytes();
return ByteBuffer.allocate(1 + excludeBytes.length)
.put(CACHE_TYPE_ID)
.put(excludeBytes)
.array();
}
@ -121,6 +137,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", duration=" + getDuration() +
", exclude" + exclude +
'}';
}
@ -129,14 +146,14 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
List<Result<TimeBoundaryResultValue>> results = Lists.newArrayList();
Map<String, Object> result = Maps.newHashMap();
if (min != null) {
result.put(TimeBoundaryQuery.MIN_TIME, min);
if (min != null && !exclude.equalsIgnoreCase(MIN_TIME)) {
result.put(MIN_TIME, min);
}
if (max != null) {
result.put(TimeBoundaryQuery.MAX_TIME, max);
if (max != null && !exclude.equalsIgnoreCase(MAX_TIME)) {
result.put(MAX_TIME, max);
}
if (!result.isEmpty()) {
results.add(new Result<TimeBoundaryResultValue>(timestamp, new TimeBoundaryResultValue(result)));
results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result)));
}
return results;
@ -153,24 +170,40 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
for (Result<TimeBoundaryResultValue> result : results) {
TimeBoundaryResultValue val = result.getValue();
DateTime currMinTime = val.getMinTime();
if (currMinTime.isBefore(min)) {
min = currMinTime;
if (!exclude.equalsIgnoreCase(MIN_TIME)) {
DateTime currMinTime = val.getMinTime();
if (currMinTime.isBefore(min)) {
min = currMinTime;
}
}
DateTime currMaxTime = val.getMaxTime();
if (currMaxTime.isAfter(max)) {
max = currMaxTime;
if (!exclude.equalsIgnoreCase(MAX_TIME)) {
DateTime currMaxTime = val.getMaxTime();
if (currMaxTime.isAfter(max)) {
max = currMaxTime;
}
}
}
final ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
final DateTime ts;
if (exclude.equalsIgnoreCase(MIN_TIME)) {
ts = max;
builder.put(MAX_TIME, max);
} else if (exclude.equalsIgnoreCase(MAX_TIME)) {
ts = min;
builder.put(MIN_TIME, min);
} else {
ts = min;
builder.put(MAX_TIME, max);
builder.put(MIN_TIME, min);
}
return Arrays.asList(
new Result<TimeBoundaryResultValue>(
min,
new Result<>(
ts,
new TimeBoundaryResultValue(
ImmutableMap.<String, Object>of(
TimeBoundaryQuery.MIN_TIME, min,
TimeBoundaryQuery.MAX_TIME, max
)
builder.build()
)
)
);

View File

@ -67,21 +67,45 @@ public class TimeBoundaryQueryQueryToolChest
return segments;
}
final T first = segments.get(0);
final T second = segments.get(segments.size() - 1);
final T min = segments.get(0);
final T max = segments.get(segments.size() - 1);
final Predicate<T> filterPredicate;
// optimizations to avoid hitting too many segments
if (query.getExclude().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)) {
filterPredicate = new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(min.getInterval());
}
};
} else if (query.getExclude().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)) {
filterPredicate = new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(max.getInterval());
}
};
} else {
filterPredicate = new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(min.getInterval()) || input.getInterval()
.overlaps(max.getInterval());
}
};
}
return Lists.newArrayList(
Iterables.filter(
segments,
new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(first.getInterval()) || input.getInterval()
.overlaps(second.getInterval());
}
}
filterPredicate
)
);
}
@ -146,9 +170,9 @@ public class TimeBoundaryQueryQueryToolChest
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
return ByteBuffer.allocate(2)
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.array();
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.array();
}
@Override

View File

@ -99,6 +99,10 @@ public class TimeBoundaryResultValue
private DateTime getDateTimeValue(Object val)
{
if (val == null) {
return null;
}
if (val instanceof DateTime) {
return (DateTime) val;
} else if (val instanceof String) {

View File

@ -78,4 +78,46 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime);
Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime);
}
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryExcludesMin()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.exclude(TimeBoundaryQuery.MIN_TIME)
.build();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
);
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();
Assert.assertNull(minTime);
Assert.assertEquals(new DateTime("2011-04-15T00:00:00.000Z"), maxTime);
}
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryExcludesMax()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.exclude(TimeBoundaryQuery.MAX_TIME)
.build();
Iterable<Result<TimeBoundaryResultValue>> results = Sequences.toList(
runner.run(timeBoundaryQuery),
Lists.<Result<TimeBoundaryResultValue>>newArrayList()
);
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();
Assert.assertEquals(new DateTime("2011-01-12T00:00:00.000Z"), minTime);
Assert.assertNull(maxTime);
}
}