mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:metamx/druid
This commit is contained in:
commit
5f2c3cec1b
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,8 +19,11 @@
|
|||
|
||||
package com.metamx.druid.client.cache;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -48,6 +51,14 @@ public interface Cache
|
|||
this.key = key;
|
||||
}
|
||||
|
||||
public byte[] toByteArray() {
|
||||
final byte[] nsBytes = this.namespace.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(Ints.BYTES + nsBytes.length + this.key.length)
|
||||
.putInt(nsBytes.length)
|
||||
.put(nsBytes)
|
||||
.put(this.key).array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -20,9 +20,10 @@
|
|||
package com.metamx.druid.client.cache;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import net.iharder.base64.Base64;
|
||||
import com.google.common.primitives.Ints;
|
||||
import net.spy.memcached.AddrUtil;
|
||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||
import net.spy.memcached.DefaultHashAlgorithm;
|
||||
|
@ -31,9 +32,12 @@ import net.spy.memcached.MemcachedClient;
|
|||
import net.spy.memcached.MemcachedClientIF;
|
||||
import net.spy.memcached.internal.BulkFuture;
|
||||
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -62,6 +66,7 @@ public class MemcachedCache implements Cache
|
|||
.build(),
|
||||
AddrUtil.getAddresses(config.getHosts())
|
||||
),
|
||||
config.getMemcachedPrefix(),
|
||||
config.getTimeout(),
|
||||
config.getExpiration()
|
||||
);
|
||||
|
@ -72,6 +77,7 @@ public class MemcachedCache implements Cache
|
|||
|
||||
private final int timeout;
|
||||
private final int expiration;
|
||||
private final String memcachedPrefix;
|
||||
|
||||
private final MemcachedClientIF client;
|
||||
|
||||
|
@ -79,10 +85,15 @@ public class MemcachedCache implements Cache
|
|||
private final AtomicLong missCount = new AtomicLong(0);
|
||||
private final AtomicLong timeoutCount = new AtomicLong(0);
|
||||
|
||||
MemcachedCache(MemcachedClientIF client, int timeout, int expiration) {
|
||||
MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
|
||||
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
|
||||
"memcachedPrefix length [%d] exceeds maximum length [%d]",
|
||||
memcachedPrefix.length(),
|
||||
MAX_PREFIX_LENGTH);
|
||||
this.timeout = timeout;
|
||||
this.expiration = expiration;
|
||||
this.client = client;
|
||||
this.memcachedPrefix = memcachedPrefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,7 +112,7 @@ public class MemcachedCache implements Cache
|
|||
@Override
|
||||
public byte[] get(NamedKey key)
|
||||
{
|
||||
Future<Object> future = client.asyncGet(computeKeyString(key));
|
||||
Future<Object> future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
|
||||
try {
|
||||
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
||||
if(bytes != null) {
|
||||
|
@ -110,7 +121,7 @@ public class MemcachedCache implements Cache
|
|||
else {
|
||||
missCount.incrementAndGet();
|
||||
}
|
||||
return bytes;
|
||||
return bytes == null ? null : deserializeValue(key, bytes);
|
||||
}
|
||||
catch(TimeoutException e) {
|
||||
timeoutCount.incrementAndGet();
|
||||
|
@ -129,7 +140,30 @@ public class MemcachedCache implements Cache
|
|||
@Override
|
||||
public void put(NamedKey key, byte[] value)
|
||||
{
|
||||
client.set(computeKeyString(key), expiration, value);
|
||||
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
|
||||
}
|
||||
|
||||
private static byte[] serializeValue(NamedKey key, byte[] value) {
|
||||
byte[] keyBytes = key.toByteArray();
|
||||
return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length)
|
||||
.putInt(keyBytes.length)
|
||||
.put(keyBytes)
|
||||
.put(value)
|
||||
.array();
|
||||
}
|
||||
|
||||
private static byte[] deserializeValue(NamedKey key, byte[] bytes) {
|
||||
ByteBuffer buf = ByteBuffer.wrap(bytes);
|
||||
|
||||
final int keyLength = buf.getInt();
|
||||
byte[] keyBytes = new byte[keyLength];
|
||||
buf.get(keyBytes);
|
||||
byte[] value = new byte[buf.remaining()];
|
||||
buf.get(value);
|
||||
|
||||
Preconditions.checkState(Arrays.equals(keyBytes, key.toByteArray()),
|
||||
"Keys do not match, possible hash collision?");
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -144,7 +178,7 @@ public class MemcachedCache implements Cache
|
|||
@Nullable NamedKey input
|
||||
)
|
||||
{
|
||||
return computeKeyString(input);
|
||||
return computeKeyHash(memcachedPrefix, input);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -163,9 +197,11 @@ public class MemcachedCache implements Cache
|
|||
|
||||
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
||||
for(Map.Entry<String, Object> entry : some.entrySet()) {
|
||||
final NamedKey key = keyLookup.get(entry.getKey());
|
||||
final byte[] value = (byte[]) entry.getValue();
|
||||
results.put(
|
||||
keyLookup.get(entry.getKey()),
|
||||
(byte[])entry.getValue()
|
||||
key,
|
||||
value == null ? null : deserializeValue(key, value)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -186,7 +222,15 @@ public class MemcachedCache implements Cache
|
|||
// no resources to cleanup
|
||||
}
|
||||
|
||||
private static String computeKeyString(NamedKey key) {
|
||||
return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES);
|
||||
public static final int MAX_PREFIX_LENGTH =
|
||||
MemcachedClientIF.MAX_KEY_LENGTH
|
||||
- 40 // length of namespace hash
|
||||
- 40 // length of key hash
|
||||
- 2 // length of separators
|
||||
;
|
||||
|
||||
private static String computeKeyHash(String memcachedPrefix, NamedKey key) {
|
||||
// hash keys to keep things under 250 characters for memcached
|
||||
return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,4 +18,7 @@ public abstract class MemcachedCacheConfig
|
|||
|
||||
@Config("${prefix}.maxObjectSize")
|
||||
public abstract int getMaxObjectSize();
|
||||
|
||||
@Config("${prefix}.memcachedPrefix")
|
||||
public abstract String getMemcachedPrefix();
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.junit.Test;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class MapCacheBrokerTest
|
||||
public class MapCacheTest
|
||||
{
|
||||
private static final byte[] HI = "hi".getBytes();
|
||||
private static final byte[] HO = "ho".getBytes();
|
|
@ -17,7 +17,7 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||
public class MemcachedCacheBenchmark extends SimpleBenchmark
|
||||
{
|
||||
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
|
||||
public static final String NAMESPACE = "default";
|
||||
|
@ -56,6 +56,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
|||
|
||||
cache = new MemcachedCache(
|
||||
client,
|
||||
"druid-memcached-benchmark",
|
||||
30000, // 30 seconds
|
||||
3600 // 1 hour
|
||||
);
|
||||
|
@ -113,6 +114,6 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
|||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Runner.main(MemcachedCacheBrokerBenchmark.class, args);
|
||||
Runner.main(MemcachedCacheBenchmark.class, args);
|
||||
}
|
||||
}
|
|
@ -50,7 +50,7 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class MemcachedCacheBrokerTest
|
||||
public class MemcachedCacheTest
|
||||
{
|
||||
private static final byte[] HI = "hi".getBytes();
|
||||
private static final byte[] HO = "ho".getBytes();
|
||||
|
@ -60,7 +60,7 @@ public class MemcachedCacheBrokerTest
|
|||
public void setUp() throws Exception
|
||||
{
|
||||
MemcachedClientIF client = new MockMemcachedClient();
|
||||
cache = new MemcachedCache(client, 500, 3600);
|
||||
cache = new MemcachedCache(client, "druid-memcached-test", 500, 3600);
|
||||
}
|
||||
|
||||
@Test
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
<artifactId>druid-services</artifactId>
|
||||
<name>druid-services</name>
|
||||
<description>druid-services</description>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modules>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid-examples</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid-examples</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -51,7 +51,7 @@
|
|||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
<version>1.2.15</version>
|
||||
<version>1.3.27</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.mail</groupId>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -177,22 +177,19 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
return;
|
||||
}
|
||||
|
||||
List<WorkerWrapper> thoseLazyWorkers = Lists.newArrayList(
|
||||
FunctionalIterable
|
||||
.create(zkWorkers.values())
|
||||
.filter(
|
||||
new Predicate<WorkerWrapper>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(WorkerWrapper input)
|
||||
{
|
||||
return input.getRunningTasks().isEmpty()
|
||||
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
|
||||
> config.getMaxWorkerIdleTimeMillisBeforeDeletion();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
int workerCount = 0;
|
||||
List<WorkerWrapper> thoseLazyWorkers = Lists.newArrayList();
|
||||
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
|
||||
workerCount++;
|
||||
|
||||
if (workerCount > workerSetupManager.getWorkerSetupData().getMinNumWorkers() &&
|
||||
workerWrapper.getRunningTasks().isEmpty() &&
|
||||
System.currentTimeMillis() - workerWrapper.getLastCompletedTaskTime().getMillis()
|
||||
> config.getMaxWorkerIdleTimeMillisBeforeDeletion()
|
||||
) {
|
||||
thoseLazyWorkers.add(workerWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
AutoScalingData terminated = strategy.terminate(
|
||||
Lists.transform(
|
||||
|
|
|
@ -38,7 +38,7 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
|
|||
public abstract DateTime getTerminateResourcesOriginDateTime();
|
||||
|
||||
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
|
||||
@Default("10000")
|
||||
@Default("600000")
|
||||
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
|
||||
|
||||
@Config("druid.indexer.maxScalingDuration")
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -215,7 +215,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
while (baseIter.hasNext()) {
|
||||
currEntry.set(baseIter.next());
|
||||
if (filterMatcher.matches()) {
|
||||
break;
|
||||
return;
|
||||
}
|
||||
|
||||
numAdvanced++;
|
||||
|
|
|
@ -22,6 +22,7 @@ package com.metamx.druid.query.group;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -41,6 +42,7 @@ import com.metamx.druid.query.dimension.DefaultDimensionSpec;
|
|||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Test;
|
||||
|
@ -150,6 +152,70 @@ public class GroupByQueryRunnerTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithTimeZone() {
|
||||
DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
|
||||
|
||||
GroupByQuery query = GroupByQuery.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00")
|
||||
.setDimensions(
|
||||
Lists.newArrayList(
|
||||
(DimensionSpec) new DefaultDimensionSpec(
|
||||
"quality",
|
||||
"alias"
|
||||
)
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
)
|
||||
)
|
||||
)
|
||||
.setGranularity(
|
||||
new PeriodGranularity(
|
||||
new Period("P1D"),
|
||||
null,
|
||||
tz
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "automotive", "rows", 1L, "idx", 135L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "business", "rows", 1L, "idx", 118L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "entertainment", "rows", 1L, "idx", 158L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "health", "rows", 1L, "idx", 120L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "mezzanine", "rows", 3L, "idx", 2870L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "news", "rows", 1L, "idx", 121L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "premium", "rows", 3L, "idx", 2900L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "technology", "rows", 1L, "idx", 78L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "travel", "rows", 1L, "idx", 119L)),
|
||||
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "automotive", "rows", 1L, "idx", 147L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "business", "rows", 1L, "idx", 112L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "entertainment", "rows", 1L, "idx", 166L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "health", "rows", 1L, "idx", 113L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "mezzanine", "rows", 3L, "idx", 2447L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "news", "rows", 1L, "idx", 114L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "premium", "rows", 3L, "idx", 2505L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "technology", "rows", 1L, "idx", 97L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "travel", "rows", 1L, "idx", 126L))
|
||||
);
|
||||
|
||||
Iterable<Row> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
Lists.<Row>newArrayList()
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMergeResults() {
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
package com.metamx.druid.query.timeseries;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.druid.Druids;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.QueryGranularity;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.aggregation.CountAggregatorFactory;
|
||||
import com.metamx.druid.index.IncrementalIndexSegment;
|
||||
import com.metamx.druid.index.Segment;
|
||||
import com.metamx.druid.index.v1.IncrementalIndex;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import com.metamx.druid.query.FinalizeResultsQueryRunner;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
import com.metamx.druid.query.QueryRunnerFactory;
|
||||
import com.metamx.druid.result.Result;
|
||||
import com.metamx.druid.result.TimeseriesResultValue;
|
||||
import junit.framework.Assert;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TimeseriesQueryRunnerBonusTest
|
||||
{
|
||||
@Test
|
||||
public void testOneRowAtATime() throws Exception
|
||||
{
|
||||
final IncrementalIndex oneRowIndex = new IncrementalIndex(
|
||||
new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{}
|
||||
);
|
||||
|
||||
List<Result<TimeseriesResultValue>> results;
|
||||
|
||||
oneRowIndex.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime("2012-01-01T00:00:00Z").getMillis(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableMap.<String, Object>of("dim1", "x")
|
||||
)
|
||||
);
|
||||
|
||||
results = runTimeseriesCount(oneRowIndex);
|
||||
|
||||
Assert.assertEquals("index size", 1, oneRowIndex.size());
|
||||
Assert.assertEquals("result size", 1, results.size());
|
||||
Assert.assertEquals("result timestamp", new DateTime("2012-01-01T00:00:00Z"), results.get(0).getTimestamp());
|
||||
Assert.assertEquals("result count metric", 1, (long) results.get(0).getValue().getLongMetric("rows"));
|
||||
|
||||
oneRowIndex.add(
|
||||
new MapBasedInputRow(
|
||||
new DateTime("2012-01-01T00:00:00Z").getMillis(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableMap.<String, Object>of("dim1", "y")
|
||||
)
|
||||
);
|
||||
|
||||
results = runTimeseriesCount(oneRowIndex);
|
||||
|
||||
Assert.assertEquals("index size", 2, oneRowIndex.size());
|
||||
Assert.assertEquals("result size", 1, results.size());
|
||||
Assert.assertEquals("result timestamp", new DateTime("2012-01-01T00:00:00Z"), results.get(0).getTimestamp());
|
||||
Assert.assertEquals("result count metric", 2, (long) results.get(0).getValue().getLongMetric("rows"));
|
||||
}
|
||||
|
||||
private static List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
|
||||
{
|
||||
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
|
||||
final QueryRunner<Result<TimeseriesResultValue>> runner = makeQueryRunner(
|
||||
factory,
|
||||
new IncrementalIndexSegment(index)
|
||||
);
|
||||
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("xxx")
|
||||
.granularity(QueryGranularity.ALL)
|
||||
.intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D")))
|
||||
.aggregators(
|
||||
ImmutableList.<AggregatorFactory>of(
|
||||
new CountAggregatorFactory("rows")
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
return Sequences.toList(
|
||||
runner.run(query),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
}
|
||||
|
||||
private static <T> QueryRunner<T> makeQueryRunner(
|
||||
QueryRunnerFactory<T, Query<T>> factory,
|
||||
Segment adapter
|
||||
)
|
||||
{
|
||||
return new FinalizeResultsQueryRunner<T>(
|
||||
factory.createRunner(adapter),
|
||||
factory.getToolchest()
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue