mirror of https://github.com/apache/druid.git
fixes to init and more unit tests for timeseries
This commit is contained in:
parent
5cf21e3c8a
commit
17fb93d049
|
@ -25,6 +25,7 @@ import com.metamx.druid.query.search.SearchQuery;
|
||||||
import com.metamx.druid.query.segment.QuerySegmentSpec;
|
import com.metamx.druid.query.segment.QuerySegmentSpec;
|
||||||
import com.metamx.druid.query.segment.QuerySegmentWalker;
|
import com.metamx.druid.query.segment.QuerySegmentWalker;
|
||||||
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
|
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
|
||||||
|
import com.metamx.druid.query.timeseries.TimeseriesQuery;
|
||||||
import org.codehaus.jackson.annotate.JsonSubTypes;
|
import org.codehaus.jackson.annotate.JsonSubTypes;
|
||||||
import org.codehaus.jackson.annotate.JsonTypeInfo;
|
import org.codehaus.jackson.annotate.JsonTypeInfo;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
@ -35,12 +36,14 @@ import java.util.Map;
|
||||||
|
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "queryType")
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "queryType")
|
||||||
@JsonSubTypes(value = {
|
@JsonSubTypes(value = {
|
||||||
|
@JsonSubTypes.Type(name = Query.TIMESERIES, value = TimeseriesQuery.class),
|
||||||
@JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class),
|
@JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class),
|
||||||
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
|
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
|
||||||
@JsonSubTypes.Type(name = "groupBy", value= GroupByQuery.class)
|
@JsonSubTypes.Type(name = "groupBy", value= GroupByQuery.class)
|
||||||
})
|
})
|
||||||
public interface Query<T>
|
public interface Query<T>
|
||||||
{
|
{
|
||||||
|
public static final String TIMESERIES = "timeseries";
|
||||||
public static final String SEARCH = "search";
|
public static final String SEARCH = "search";
|
||||||
public static final String TIME_BOUNDARY = "timeBoundary";
|
public static final String TIME_BOUNDARY = "timeBoundary";
|
||||||
|
|
||||||
|
|
|
@ -393,16 +393,14 @@ public class Initialization
|
||||||
|
|
||||||
// validate druid.zk.paths.*Path properties
|
// validate druid.zk.paths.*Path properties
|
||||||
//
|
//
|
||||||
// if any zpath overrides are set in properties, all must be set, and they must start with /
|
// if any zpath overrides are set in properties, they must start with /
|
||||||
int zpathOverrideCount = 0;
|
int zpathOverrideCount = 0;
|
||||||
boolean zpathOverridesNotAbs = false;
|
|
||||||
StringBuilder sbErrors = new StringBuilder(100);
|
StringBuilder sbErrors = new StringBuilder(100);
|
||||||
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
|
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
|
||||||
String val = props.getProperty(SUB_PATH_PROPS[i]);
|
String val = props.getProperty(SUB_PATH_PROPS[i]);
|
||||||
if (val != null) {
|
if (val != null) {
|
||||||
zpathOverrideCount++;
|
zpathOverrideCount++;
|
||||||
if (!val.startsWith("/")) {
|
if (!val.startsWith("/")) {
|
||||||
zpathOverridesNotAbs = true;
|
|
||||||
sbErrors.append(SUB_PATH_PROPS[i]).append("=").append(val).append("\n");
|
sbErrors.append(SUB_PATH_PROPS[i]).append("=").append(val).append("\n");
|
||||||
zpathValidateFailed = true;
|
zpathValidateFailed = true;
|
||||||
}
|
}
|
||||||
|
@ -412,36 +410,11 @@ public class Initialization
|
||||||
if (propertiesZpathOverride != null) {
|
if (propertiesZpathOverride != null) {
|
||||||
zpathOverrideCount++;
|
zpathOverrideCount++;
|
||||||
if (!propertiesZpathOverride.startsWith("/")) {
|
if (!propertiesZpathOverride.startsWith("/")) {
|
||||||
zpathOverridesNotAbs = true;
|
|
||||||
sbErrors.append("druid.zk.paths.propertiesPath").append("=").append(propertiesZpathOverride).append("\n");
|
sbErrors.append("druid.zk.paths.propertiesPath").append("=").append(propertiesZpathOverride).append("\n");
|
||||||
zpathValidateFailed = true;
|
zpathValidateFailed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (zpathOverridesNotAbs) {
|
if (zpathOverrideCount == 0) {
|
||||||
System.err.println(
|
|
||||||
"When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
|
|
||||||
"the znode path must start with '/' (slash) ; problem overrides:"
|
|
||||||
);
|
|
||||||
System.err.print(sbErrors.toString());
|
|
||||||
}
|
|
||||||
if (zpathOverrideCount > 0) {
|
|
||||||
if (zpathOverrideCount < SUB_PATH_PROPS.length) {
|
|
||||||
zpathValidateFailed = true;
|
|
||||||
System.err.println(
|
|
||||||
"When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
|
|
||||||
"all must be overridden together; missing overrides:"
|
|
||||||
);
|
|
||||||
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
|
|
||||||
String val = props.getProperty(SUB_PATH_PROPS[i]);
|
|
||||||
if (val == null) {
|
|
||||||
System.err.println(" " + SUB_PATH_PROPS[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else { // proper overrides
|
|
||||||
// do not prefix with property druid.zk.paths.base
|
|
||||||
; // fallthru
|
|
||||||
}
|
|
||||||
} else { // no overrides
|
|
||||||
if (propertyZpath == null) { // if default base is used, store it as documentation
|
if (propertyZpath == null) { // if default base is used, store it as documentation
|
||||||
props.setProperty("druid.zk.paths.base", zpathEffective);
|
props.setProperty("druid.zk.paths.base", zpathEffective);
|
||||||
}
|
}
|
||||||
|
@ -453,6 +426,15 @@ public class Initialization
|
||||||
}
|
}
|
||||||
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
|
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (zpathValidateFailed) {
|
||||||
|
System.err.println(
|
||||||
|
"When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
|
||||||
|
"the znode path must start with '/' (slash) ; problem overrides:"
|
||||||
|
);
|
||||||
|
System.err.print(sbErrors.toString());
|
||||||
|
}
|
||||||
|
|
||||||
return !zpathValidateFailed;
|
return !zpathValidateFailed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,221 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
|
import com.metamx.druid.aggregation.CountAggregatorFactory;
|
||||||
|
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
|
||||||
|
import com.metamx.druid.aggregation.post.ArithmeticPostAggregator;
|
||||||
|
import com.metamx.druid.aggregation.post.ConstantPostAggregator;
|
||||||
|
import com.metamx.druid.aggregation.post.FieldAccessPostAggregator;
|
||||||
|
import com.metamx.druid.aggregation.post.PostAggregator;
|
||||||
|
import com.metamx.druid.result.Result;
|
||||||
|
import com.metamx.druid.result.TimeseriesResultValue;
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class TimeseriesBinaryFnTest
|
||||||
|
{
|
||||||
|
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||||
|
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
|
||||||
|
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
|
||||||
|
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||||
|
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||||
|
final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator(
|
||||||
|
"addRowsIndexConstant",
|
||||||
|
"+",
|
||||||
|
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
|
||||||
|
);
|
||||||
|
final List<AggregatorFactory> aggregatorFactories = Arrays.asList(
|
||||||
|
rowsCount,
|
||||||
|
indexLongSum
|
||||||
|
);
|
||||||
|
final List<PostAggregator> postAggregators = Arrays.<PostAggregator>asList(
|
||||||
|
addRowsIndexConstant
|
||||||
|
);
|
||||||
|
final DateTime currTime = new DateTime();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMerge()
|
||||||
|
{
|
||||||
|
Result<TimeseriesResultValue> result1 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 1L,
|
||||||
|
"index", 2L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Result<TimeseriesResultValue> result2 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 2L,
|
||||||
|
"index", 3L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> expected = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 3L,
|
||||||
|
"index", 5L,
|
||||||
|
"addRowsIndexConstant", 9.0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
|
||||||
|
QueryGranularity.ALL,
|
||||||
|
aggregatorFactories,
|
||||||
|
postAggregators
|
||||||
|
).apply(
|
||||||
|
result1,
|
||||||
|
result2
|
||||||
|
);
|
||||||
|
Assert.assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeDay()
|
||||||
|
{
|
||||||
|
Result<TimeseriesResultValue> result1 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 1L,
|
||||||
|
"index", 2L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Result<TimeseriesResultValue> result2 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 2L,
|
||||||
|
"index", 3L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> expected = new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime(QueryGranularity.DAY.truncate(currTime.getMillis())),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 3L,
|
||||||
|
"index", 5L,
|
||||||
|
"addRowsIndexConstant", 9.0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
|
||||||
|
QueryGranularity.DAY,
|
||||||
|
aggregatorFactories,
|
||||||
|
postAggregators
|
||||||
|
).apply(
|
||||||
|
result1,
|
||||||
|
result2
|
||||||
|
);
|
||||||
|
Assert.assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeOneNullResult()
|
||||||
|
{
|
||||||
|
Result<TimeseriesResultValue> result1 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 1L,
|
||||||
|
"index", 2L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Result<TimeseriesResultValue> result2 = null;
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> expected = result1;
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
|
||||||
|
QueryGranularity.ALL,
|
||||||
|
aggregatorFactories,
|
||||||
|
postAggregators
|
||||||
|
).apply(
|
||||||
|
result1,
|
||||||
|
result2
|
||||||
|
);
|
||||||
|
Assert.assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeShiftedTimestamp()
|
||||||
|
{
|
||||||
|
Result<TimeseriesResultValue> result1 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 1L,
|
||||||
|
"index", 2L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Result<TimeseriesResultValue> result2 = new Result<TimeseriesResultValue>(
|
||||||
|
currTime.plusHours(2),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 2L,
|
||||||
|
"index", 3L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> expected = new Result<TimeseriesResultValue>(
|
||||||
|
currTime,
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"rows", 3L,
|
||||||
|
"index", 5L,
|
||||||
|
"addRowsIndexConstant", 9.0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
|
||||||
|
QueryGranularity.ALL,
|
||||||
|
aggregatorFactories,
|
||||||
|
postAggregators
|
||||||
|
).apply(
|
||||||
|
result1,
|
||||||
|
result2
|
||||||
|
);
|
||||||
|
Assert.assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue