Merge branch 'master' into spatial

This commit is contained in:
fjy 2013-05-14 16:10:18 -07:00
commit 5af188f18d
59 changed files with 1896 additions and 224 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.selector.QueryableDruidServer;
import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
@ -44,7 +45,7 @@ public class BrokerServerView implements TimelineServerView
private final Object lock = new Object(); private final Object lock = new Object();
private final ConcurrentMap<DruidServer, DirectDruidClient> clients; private final ConcurrentMap<String, QueryableDruidServer> clients;
private final Map<String, ServerSelector> selectors; private final Map<String, ServerSelector> selectors;
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines; private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
@ -107,7 +108,7 @@ public class BrokerServerView implements TimelineServerView
public void clear() public void clear()
{ {
synchronized (lock) { synchronized (lock) {
final Iterator<DruidServer> clientsIter = clients.keySet().iterator(); final Iterator<String> clientsIter = clients.keySet().iterator();
while (clientsIter.hasNext()) { while (clientsIter.hasNext()) {
clientsIter.remove(); clientsIter.remove();
} }
@ -119,7 +120,7 @@ public class BrokerServerView implements TimelineServerView
final ServerSelector selector = selectorsIter.next(); final ServerSelector selector = selectorsIter.next();
selectorsIter.remove(); selectorsIter.remove();
while (!selector.isEmpty()) { while (!selector.isEmpty()) {
final DruidServer pick = selector.pick(); final QueryableDruidServer pick = selector.pick();
selector.removeServer(pick); selector.removeServer(pick);
} }
} }
@ -128,7 +129,10 @@ public class BrokerServerView implements TimelineServerView
private void addServer(DruidServer server) private void addServer(DruidServer server)
{ {
QueryRunner exists = clients.put(server, makeDirectClient(server)); QueryableDruidServer exists = clients.put(
server.getName(),
new QueryableDruidServer(server, makeDirectClient(server))
);
if (exists != null) { if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!?", server); log.warn("QueryRunner for server[%s] already existed!?", server);
} }
@ -141,7 +145,7 @@ public class BrokerServerView implements TimelineServerView
private void removeServer(DruidServer server) private void removeServer(DruidServer server)
{ {
clients.remove(server); clients.remove(server.getName());
for (DataSegment segment : server.getSegments().values()) { for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment); serverRemovedSegment(server, segment);
} }
@ -167,10 +171,10 @@ public class BrokerServerView implements TimelineServerView
selectors.put(segmentId, selector); selectors.put(segmentId, selector);
} }
if (!clients.containsKey(server)) { if (!clients.containsKey(server.getName())) {
addServer(server); addServer(server);
} }
selector.addServer(server); selector.addServer(clients.get(server.getName()));
} }
} }
@ -188,7 +192,8 @@ public class BrokerServerView implements TimelineServerView
return; return;
} }
if (!selector.removeServer(server)) { QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (!selector.removeServer(queryableDruidServer)) {
log.warn( log.warn(
"Asked to disassociate non-existant association between server[%s] and segment[%s]", "Asked to disassociate non-existant association between server[%s] and segment[%s]",
server, server,
@ -228,7 +233,11 @@ public class BrokerServerView implements TimelineServerView
public <T> QueryRunner<T> getQueryRunner(DruidServer server) public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{ {
synchronized (lock) { synchronized (lock) {
return clients.get(server); QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
}
return queryableDruidServer.getClient();
} }
} }

View File

@ -203,7 +203,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Compile list of all segments not pulled from cache // Compile list of all segments not pulled from cache
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) { for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final DruidServer server = segment.lhs.pick(); final DruidServer server = segment.lhs.pick().getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server); List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) { if (descriptors == null) {

View File

@ -47,7 +47,6 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
@ -60,6 +59,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
@ -74,6 +74,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final HttpClient httpClient; private final HttpClient httpClient;
private final String host; private final String host;
private final AtomicInteger openConnections;
private final boolean isSmile; private final boolean isSmile;
public DirectDruidClient( public DirectDruidClient(
@ -88,7 +89,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
this.httpClient = httpClient; this.httpClient = httpClient;
this.host = host; this.host = host;
isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory; this.isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
}
public int getNumOpenConnections()
{
return openConnections.get();
} }
@Override @Override
@ -121,6 +128,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try { try {
log.debug("Querying url[%s]", url); log.debug("Querying url[%s]", url);
openConnections.getAndIncrement();
future = httpClient future = httpClient
.post(new URL(url)) .post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query)) .setContent(objectMapper.writeValueAsBytes(query))
@ -128,7 +136,6 @@ public class DirectDruidClient<T> implements QueryRunner<T>
.go( .go(
new InputStreamResponseHandler() new InputStreamResponseHandler()
{ {
long startTime; long startTime;
long byteCount = 0; long byteCount = 0;
@ -162,6 +169,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
stopTime - startTime, stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime)) byteCount / (0.0001 * (stopTime - startTime))
); );
openConnections.getAndDecrement();
return super.done(clientResponse); return super.done(clientResponse);
} }
} }

View File

@ -135,7 +135,7 @@ public class ServerInventoryView implements ServerView, InventoryView
@Override @Override
public void deadContainer(DruidServer deadContainer) public void deadContainer(DruidServer deadContainer)
{ {
log.info("Server Disdappeared[%s]", deadContainer); log.info("Server Disappeared[%s]", deadContainer);
runServerCallbacks(deadContainer); runServerCallbacks(deadContainer);
} }

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.selector;
import com.metamx.druid.client.DirectDruidClient;
import com.metamx.druid.client.DruidServer;
/**
*/
public class QueryableDruidServer
{
private final DruidServer server;
private final DirectDruidClient client;
public QueryableDruidServer(DruidServer server, DirectDruidClient client)
{
this.server = server;
this.client = client;
}
public DruidServer getServer()
{
return server;
}
public DirectDruidClient getClient()
{
return client;
}
}

View File

@ -19,21 +19,30 @@
package com.metamx.druid.client.selector; package com.metamx.druid.client.selector;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import java.util.LinkedHashSet; import java.util.Collections;
import java.util.Random; import java.util.Comparator;
import java.util.Set;
/** /**
*/ */
public class ServerSelector public class ServerSelector
{ {
private static final Random random = new Random(); private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final LinkedHashSet<DruidServer> servers = Sets.newLinkedHashSet();
private final DataSegment segment; private final DataSegment segment;
public ServerSelector( public ServerSelector(
@ -49,7 +58,7 @@ public class ServerSelector
} }
public void addServer( public void addServer(
DruidServer server QueryableDruidServer server
) )
{ {
synchronized (this) { synchronized (this) {
@ -57,7 +66,7 @@ public class ServerSelector
} }
} }
public boolean removeServer(DruidServer server) public boolean removeServer(QueryableDruidServer server)
{ {
synchronized (this) { synchronized (this) {
return servers.remove(server); return servers.remove(server);
@ -71,15 +80,10 @@ public class ServerSelector
} }
} }
public DruidServer pick() public QueryableDruidServer pick()
{ {
synchronized (this) { synchronized (this) {
final int size = servers.size(); return Collections.min(servers, comparator);
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Iterables.get(servers, random.nextInt(size));
}
} }
} }
} }

View File

@ -22,11 +22,14 @@ package com.metamx.druid.query.group;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.BaseQuery; import com.metamx.druid.BaseQuery;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity; import com.metamx.druid.QueryGranularity;
@ -37,13 +40,15 @@ import com.metamx.druid.query.Queries;
import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.filter.DimFilter; import com.metamx.druid.query.filter.DimFilter;
import com.metamx.druid.query.group.limit.DefaultLimitSpec; import com.metamx.druid.query.group.having.HavingSpec;
import com.metamx.druid.query.group.limit.LimitSpec; import com.metamx.druid.query.group.orderby.DefaultLimitSpec;
import com.metamx.druid.query.group.limit.NoopLimitSpec; import com.metamx.druid.query.group.orderby.LimitSpec;
import com.metamx.druid.query.group.limit.OrderByColumnSpec; import com.metamx.druid.query.group.orderby.NoopLimitSpec;
import com.metamx.druid.query.group.orderby.OrderByColumnSpec;
import com.metamx.druid.query.segment.LegacySegmentSpec; import com.metamx.druid.query.segment.LegacySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentSpec; import com.metamx.druid.query.segment.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -57,6 +62,7 @@ public class GroupByQuery extends BaseQuery<Row>
} }
private final LimitSpec limitSpec; private final LimitSpec limitSpec;
private final HavingSpec havingSpec;
private final DimFilter dimFilter; private final DimFilter dimFilter;
private final QueryGranularity granularity; private final QueryGranularity granularity;
private final List<DimensionSpec> dimensions; private final List<DimensionSpec> dimensions;
@ -69,34 +75,88 @@ public class GroupByQuery extends BaseQuery<Row>
public GroupByQuery( public GroupByQuery(
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("filter") DimFilter dimFilter, @JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<DimensionSpec> dimensions, @JsonProperty("dimensions") List<DimensionSpec> dimensions,
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs, @JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs, @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("orderBy") LimitSpec orderBySpec,
@JsonProperty("context") Map<String, String> context @JsonProperty("context") Map<String, String> context
) )
{ {
super(dataSource, querySegmentSpec, context); super(dataSource, querySegmentSpec, context);
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
this.dimFilter = dimFilter; this.dimFilter = dimFilter;
this.granularity = granularity; this.granularity = granularity;
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions; this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
this.aggregatorSpecs = aggregatorSpecs; this.aggregatorSpecs = aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs; this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
this.havingSpec = havingSpec;
this.limitSpec = (limitSpec == null) ? (orderBySpec == null ? new NoopLimitSpec() : orderBySpec) : limitSpec;
Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator"); Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
orderByLimitFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); Function<Sequence<Row>, Sequence<Row>> postProcFn =
this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
if (havingSpec != null) {
postProcFn = Functions.compose(
new Function<Sequence<Row>, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(@Nullable Sequence<Row> input)
{
return Sequences.filter(
input,
new Predicate<Row>()
{
@Override
public boolean apply(@Nullable Row input)
{
return GroupByQuery.this.havingSpec.eval(input);
}
}
);
}
},
postProcFn
);
}
orderByLimitFn = postProcFn;
} }
@JsonProperty /**
public LimitSpec getLimitSpec() * A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks
* have already passed in order for the object to exist.
*/
private GroupByQuery(
String dataSource,
QuerySegmentSpec querySegmentSpec,
DimFilter dimFilter,
QueryGranularity granularity,
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggregatorSpecs,
List<PostAggregator> postAggregatorSpecs,
HavingSpec havingSpec,
LimitSpec orderBySpec,
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
Map<String, String> context
)
{ {
return limitSpec; super(dataSource, querySegmentSpec, context);
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
this.aggregatorSpecs = aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs;
this.havingSpec = havingSpec;
this.limitSpec = orderBySpec;
this.orderByLimitFn = orderByLimitFn;
} }
@JsonProperty("filter") @JsonProperty("filter")
@ -129,6 +189,18 @@ public class GroupByQuery extends BaseQuery<Row>
return postAggregatorSpecs; return postAggregatorSpecs;
} }
@JsonProperty("having")
public HavingSpec getHavingSpec()
{
return havingSpec;
}
@JsonProperty
public LimitSpec getOrderBy()
{
return limitSpec;
}
@Override @Override
public boolean hasFilters() public boolean hasFilters()
{ {
@ -152,12 +224,14 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery( return new GroupByQuery(
getDataSource(), getDataSource(),
getQuerySegmentSpec(), getQuerySegmentSpec(),
limitSpec,
dimFilter, dimFilter,
granularity, granularity,
dimensions, dimensions,
aggregatorSpecs, aggregatorSpecs,
postAggregatorSpecs, postAggregatorSpecs,
havingSpec,
limitSpec,
orderByLimitFn,
computeOverridenContext(contextOverride) computeOverridenContext(contextOverride)
); );
} }
@ -168,12 +242,14 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery( return new GroupByQuery(
getDataSource(), getDataSource(),
spec, spec,
limitSpec,
dimFilter, dimFilter,
granularity, granularity,
dimensions, dimensions,
aggregatorSpecs, aggregatorSpecs,
postAggregatorSpecs, postAggregatorSpecs,
havingSpec,
limitSpec,
orderByLimitFn,
getContext() getContext()
); );
} }
@ -187,6 +263,8 @@ public class GroupByQuery extends BaseQuery<Row>
private List<DimensionSpec> dimensions; private List<DimensionSpec> dimensions;
private List<AggregatorFactory> aggregatorSpecs; private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs; private List<PostAggregator> postAggregatorSpecs;
private HavingSpec havingSpec;
private Map<String, String> context; private Map<String, String> context;
private LimitSpec limitSpec = null; private LimitSpec limitSpec = null;
@ -205,6 +283,9 @@ public class GroupByQuery extends BaseQuery<Row>
dimensions = builder.dimensions; dimensions = builder.dimensions;
aggregatorSpecs = builder.aggregatorSpecs; aggregatorSpecs = builder.aggregatorSpecs;
postAggregatorSpecs = builder.postAggregatorSpecs; postAggregatorSpecs = builder.postAggregatorSpecs;
havingSpec = builder.havingSpec;
limit = builder.limit;
context = builder.context; context = builder.context;
} }
@ -264,7 +345,7 @@ public class GroupByQuery extends BaseQuery<Row>
private void ensureFluentLimitsNotSet() private void ensureFluentLimitsNotSet()
{ {
if (! (limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty()) ) { if (!(limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty())) {
throw new ISE("Ambiguous build, limit[%s] or columnSpecs[%s] already set.", limit, orderByColumnSpecs); throw new ISE("Ambiguous build, limit[%s] or columnSpecs[%s] already set.", limit, orderByColumnSpecs);
} }
} }
@ -351,6 +432,20 @@ public class GroupByQuery extends BaseQuery<Row>
return this; return this;
} }
public Builder setHavingSpec(HavingSpec havingSpec)
{
this.havingSpec = havingSpec;
return this;
}
public Builder setLimit(Integer limit)
{
this.limit = limit;
return this;
}
public Builder copy() public Builder copy()
{ {
return new Builder(this); return new Builder(this);
@ -361,20 +456,21 @@ public class GroupByQuery extends BaseQuery<Row>
final LimitSpec theLimitSpec; final LimitSpec theLimitSpec;
if (limitSpec == null) { if (limitSpec == null) {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
} } else {
else {
theLimitSpec = limitSpec; theLimitSpec = limitSpec;
} }
return new GroupByQuery( return new GroupByQuery(
dataSource, dataSource,
querySegmentSpec, querySegmentSpec,
theLimitSpec,
dimFilter, dimFilter,
granularity, granularity,
dimensions, dimensions,
aggregatorSpecs, aggregatorSpecs,
postAggregatorSpecs, postAggregatorSpecs,
havingSpec,
null,
theLimitSpec,
context context
); );
} }

View File

@ -0,0 +1,14 @@
package com.metamx.druid.query.group.having;
import com.metamx.druid.input.Row;
/**
*/
public class AlwaysHavingSpec implements HavingSpec
{
@Override
public boolean eval(Row row)
{
return true;
}
}

View File

@ -0,0 +1,75 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.metamx.druid.input.Row;
import java.util.List;
/**
* The logical "and" operator for the "having" clause.
*/
public class AndHavingSpec implements HavingSpec
{
private List<HavingSpec> havingSpecs;
@JsonCreator
public AndHavingSpec(@JsonProperty("havingSpecs") List<HavingSpec> havingSpecs)
{
this.havingSpecs = havingSpecs == null ? ImmutableList.<HavingSpec>of() : havingSpecs;
}
@JsonProperty("havingSpecs")
public List<HavingSpec> getHavingSpecs()
{
return havingSpecs;
}
@Override
public boolean eval(Row row)
{
for (HavingSpec havingSpec : havingSpecs) {
if (!havingSpec.eval(row)) {
return false;
}
}
return true;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AndHavingSpec that = (AndHavingSpec) o;
if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return havingSpecs != null ? havingSpecs.hashCode() : 0;
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append("AndHavingSpec");
sb.append("{havingSpecs=").append(havingSpecs);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,94 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.input.Row;
/**
* The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value",
* except that in SQL an aggregation is an expression instead of an aggregation name as in Druid.
*/
public class EqualToHavingSpec implements HavingSpec
{
private String aggregationName;
private Number value;
@JsonCreator
public EqualToHavingSpec(
@JsonProperty("aggregation") String aggName,
@JsonProperty("value") Number value
)
{
this.aggregationName = aggName;
this.value = value;
}
@JsonProperty("value")
public Number getValue()
{
return value;
}
@JsonProperty("aggregation")
public String getAggregationName()
{
return aggregationName;
}
@Override
public boolean eval(Row row)
{
float metricValue = row.getFloatMetric(aggregationName);
return Float.compare(value.floatValue(), metricValue) == 0;
}
/**
* This method treats internal value as double mainly for ease of test.
*/
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EqualToHavingSpec that = (EqualToHavingSpec) o;
if (aggregationName != null ? !aggregationName.equals(that.aggregationName) : that.aggregationName != null) {
return false;
}
if (value != null && that.value != null) {
return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0;
}
if (value == null && that.value == null) {
return true;
}
return false;
}
@Override
public int hashCode()
{
int result = aggregationName != null ? aggregationName.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append("EqualToHavingSpec");
sb.append("{aggregationName='").append(aggregationName).append('\'');
sb.append(", value=").append(value);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,90 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.input.Row;
/**
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
*/
public class GreaterThanHavingSpec implements HavingSpec
{
private String aggregationName;
private Number value;
@JsonCreator
public GreaterThanHavingSpec(
@JsonProperty("aggregation") String aggName,
@JsonProperty("value") Number value
)
{
this.aggregationName = aggName;
this.value = value;
}
@JsonProperty("aggregation")
public String getAggregationName()
{
return aggregationName;
}
@JsonProperty("value")
public Number getValue()
{
return value;
}
@Override
public boolean eval(Row row)
{
float metricValue = row.getFloatMetric(aggregationName);
return Float.compare(metricValue, value.floatValue()) > 0;
}
/**
* This method treats internal value as double mainly for ease of test.
*/
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GreaterThanHavingSpec that = (GreaterThanHavingSpec) o;
if (value != null && that.value != null) {
return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0;
}
if (value == null && that.value == null) {
return true;
}
return false;
}
@Override
public int hashCode()
{
int result = aggregationName != null ? aggregationName.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append("GreaterThanHavingSpec");
sb.append("{aggregationName='").append(aggregationName).append('\'');
sb.append(", value=").append(value);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,51 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.input.Row;
/**
* A "having" clause that filters aggregated value. This is similar to SQL's "having"
* clause.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = AlwaysHavingSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "and", value = AndHavingSpec.class),
@JsonSubTypes.Type(name = "or", value = OrHavingSpec.class),
@JsonSubTypes.Type(name = "not", value = NotHavingSpec.class),
@JsonSubTypes.Type(name = "greaterThan", value = GreaterThanHavingSpec.class),
@JsonSubTypes.Type(name = "lessThan", value = LessThanHavingSpec.class),
@JsonSubTypes.Type(name = "equalTo", value = EqualToHavingSpec.class)
})
public interface HavingSpec
{
/**
* Evaluates if a given row satisfies the having spec.
*
* @param row A Row of data that may contain aggregated values
*
* @return true if the given row satisfies the having spec. False otherwise.
*
* @see Row
*/
public boolean eval(Row row);
// Atoms for easy combination, but for now they are mostly useful
// for testing.
/**
* A "having" spec that always evaluates to false
*/
public static final HavingSpec NEVER = new HavingSpec()
{
@Override
public boolean eval(Row row)
{
return false;
}
};
/**
* A "having" spec that always evaluates to true
*/
public static final HavingSpec ALWAYS = new AlwaysHavingSpec();
}

View File

@ -0,0 +1,89 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.input.Row;
/**
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
*/
public class LessThanHavingSpec implements HavingSpec
{
private String aggregationName;
private Number value;
public LessThanHavingSpec
(
@JsonProperty("aggregation") String aggName,
@JsonProperty("value") Number value
)
{
this.aggregationName = aggName;
this.value = value;
}
@JsonProperty("aggregation")
public String getAggregationName()
{
return aggregationName;
}
@JsonProperty("value")
public Number getValue()
{
return value;
}
@Override
public boolean eval(Row row)
{
float metricValue = row.getFloatMetric(aggregationName);
return Float.compare(metricValue, value.floatValue()) < 0;
}
/**
* This method treats internal value as double mainly for ease of test.
*/
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LessThanHavingSpec that = (LessThanHavingSpec) o;
if (value != null && that.value != null) {
return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0;
}
if (value == null && that.value == null) {
return true;
}
return false;
}
@Override
public int hashCode()
{
int result = aggregationName != null ? aggregationName.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append("LessThanHavingSpec");
sb.append("{aggregationName='").append(aggregationName).append('\'');
sb.append(", value=").append(value);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,66 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.input.Row;
/**
* The logical "not" operator for the "having" clause.
*/
public class NotHavingSpec implements HavingSpec
{
private HavingSpec havingSpec;
@JsonCreator
public NotHavingSpec(@JsonProperty("havingSpec") HavingSpec havingSpec)
{
this.havingSpec = havingSpec;
}
@JsonProperty("havingSpec")
public HavingSpec getHavingSpec()
{
return havingSpec;
}
@Override
public boolean eval(Row row)
{
return !havingSpec.eval(row);
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append("NotHavingSpec");
sb.append("{havingSpec=").append(havingSpec);
sb.append('}');
return sb.toString();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NotHavingSpec that = (NotHavingSpec) o;
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return havingSpec != null ? havingSpec.hashCode() : 0;
}
}

View File

@ -0,0 +1,67 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.metamx.druid.input.Row;
import java.util.List;
/**
* The logical "or" operator for the "having" clause.
*/
public class OrHavingSpec implements HavingSpec
{
private List<HavingSpec> havingSpecs;
@JsonCreator
public OrHavingSpec(@JsonProperty("havingSpecs") List<HavingSpec> havingSpecs) {
this.havingSpecs = havingSpecs == null ? ImmutableList.<HavingSpec>of() : havingSpecs;
}
@JsonProperty("havingSpecs")
public List<HavingSpec> getHavingSpecs(){
return havingSpecs;
}
@Override
public boolean eval(Row row)
{
for(HavingSpec havingSpec: havingSpecs) {
if(havingSpec.eval(row)){
return true;
}
}
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OrHavingSpec that = (OrHavingSpec) o;
if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) return false;
return true;
}
@Override
public int hashCode()
{
return havingSpecs != null ? havingSpecs.hashCode() : 0;
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder();
sb.append("OrHavingSpec");
sb.append("{havingSpecs=").append(havingSpecs);
sb.append('}');
return sb.toString();
}
}

View File

@ -17,13 +17,14 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.query.group.limit; package com.metamx.druid.query.group.orderby;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
@ -35,6 +36,8 @@ import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.input.Row; import com.metamx.druid.input.Row;
import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -55,7 +58,7 @@ public class DefaultLimitSpec implements LimitSpec
this.columns = (columns == null) ? ImmutableList.<OrderByColumnSpec>of() : columns; this.columns = (columns == null) ? ImmutableList.<OrderByColumnSpec>of() : columns;
this.limit = (limit == null) ? Integer.MAX_VALUE : limit; this.limit = (limit == null) ? Integer.MAX_VALUE : limit;
Preconditions.checkState(limit > 0, "limit[%s] must be >0", limit); Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit);
} }
@JsonProperty @JsonProperty
@ -75,20 +78,22 @@ public class DefaultLimitSpec implements LimitSpec
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
) )
{ {
// Materialize the Comparator first for fast-fail error checking. if (columns.isEmpty()) {
final Comparator<Row> comparator = makeComparator(dimensions, aggs, postAggs); return new LimitingFn(limit);
}
return new Function<Sequence<Row>, Sequence<Row>>() // Materialize the Comparator first for fast-fail error checking.
{ final Ordering<Row> ordering = makeComparator(dimensions, aggs, postAggs);
@Override
public Sequence<Row> apply(Sequence<Row> input) if (limit == Integer.MAX_VALUE) {
{ return new SortingFn(ordering);
return Sequences.limit(Sequences.sort(input, comparator), limit); }
} else {
}; return new TopNFunction(ordering, limit);
}
} }
private Comparator<Row> makeComparator( private Ordering<Row> makeComparator(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
) )
{ {
@ -174,4 +179,57 @@ public class DefaultLimitSpec implements LimitSpec
", limit=" + limit + ", limit=" + limit +
'}'; '}';
} }
private static class LimitingFn implements Function<Sequence<Row>, Sequence<Row>>
{
private int limit;
public LimitingFn(int limit)
{
this.limit = limit;
}
@Override
public Sequence<Row> apply(
@Nullable Sequence<Row> input
)
{
return Sequences.limit(input, limit);
}
}
private static class SortingFn implements Function<Sequence<Row>, Sequence<Row>>
{
private final Ordering<Row> ordering;
public SortingFn(Ordering<Row> ordering) {this.ordering = ordering;}
@Override
public Sequence<Row> apply(@Nullable Sequence<Row> input)
{
return Sequences.sort(input, ordering);
}
}
private static class TopNFunction implements Function<Sequence<Row>, Sequence<Row>>
{
private final TopNSorter<Row> sorter;
private final int limit;
public TopNFunction(Ordering<Row> ordering, int limit)
{
this.limit = limit;
this.sorter = new TopNSorter<Row>(ordering);
}
@Override
public Sequence<Row> apply(
@Nullable Sequence<Row> input
)
{
final ArrayList<Row> materializedList = Sequences.toList(input, Lists.<Row>newArrayList());
return Sequences.simple(sorter.toTopN(materializedList, limit));
}
}
} }

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.query.group.limit; package com.metamx.druid.query.group.orderby;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;

View File

@ -1,4 +1,4 @@
package com.metamx.druid.query.group.limit; package com.metamx.druid.query.group.orderby;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Functions; import com.google.common.base.Functions;

View File

@ -1,12 +1,17 @@
package com.metamx.druid.query.group.limit; package com.metamx.druid.query.group.orderby;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -52,6 +57,46 @@ public class OrderByColumnSpec
} }
} }
public static OrderByColumnSpec asc(String dimension)
{
return new OrderByColumnSpec(dimension, Direction.ASCENDING);
}
public static List<OrderByColumnSpec> ascending(String... dimension)
{
return Lists.transform(
Arrays.asList(dimension),
new Function<String, OrderByColumnSpec>()
{
@Override
public OrderByColumnSpec apply(@Nullable String input)
{
return asc(input);
}
}
);
}
public static OrderByColumnSpec desc(String dimension)
{
return new OrderByColumnSpec(dimension, Direction.DESCENDING);
}
public static List<OrderByColumnSpec> descending(String... dimension)
{
return Lists.transform(
Arrays.asList(dimension),
new Function<String, OrderByColumnSpec>()
{
@Override
public OrderByColumnSpec apply(@Nullable String input)
{
return desc(input);
}
}
);
}
public OrderByColumnSpec( public OrderByColumnSpec(
String dimension, String dimension,
Direction direction Direction direction

View File

@ -0,0 +1,43 @@
package com.metamx.druid.query.group.orderby;
import com.google.common.collect.MinMaxPriorityQueue;
import java.util.Iterator;
/**
* Utility class that supports iterating a priority queue in sorted order.
*/
class OrderedPriorityQueueItems<T> implements Iterable<T>
{
private MinMaxPriorityQueue<T> rows;
public OrderedPriorityQueueItems(MinMaxPriorityQueue<T> rows)
{
this.rows = rows;
}
@Override
public Iterator<T> iterator()
{
return new Iterator<T>() {
@Override
public boolean hasNext()
{
return !rows.isEmpty();
}
@Override
public T next()
{
return rows.poll();
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Can't remove any item from an intermediary heap for orderBy/limit");
}
};
}
}

View File

@ -0,0 +1,40 @@
package com.metamx.druid.query.group.orderby;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
/**
* A utility class that sorts a list of comparable items in the given order, and keeps only the
* top N sorted items.
*/
public class TopNSorter<T>
{
private Ordering<T> ordering;
/**
* Constructs a sorter that will sort items with given ordering.
* @param ordering the order that this sorter instance will use for sorting
*/
public TopNSorter(Ordering<T> ordering)
{
this.ordering = ordering;
}
/**
* Sorts a list of rows and retain the top n items
* @param items the collections of items to be sorted
* @param n the number of items to be retained
* @return Top n items that are sorted in the order specified when this instance is constructed.
*/
public Iterable<T> toTopN(Iterable<T> items, int n)
{
if(n <= 0) {
return ImmutableList.of();
}
MinMaxPriorityQueue<T> queue = MinMaxPriorityQueue.orderedBy(ordering).maximumSize(n).create(items);
return new OrderedPriorityQueueItems<T>(queue);
}
}

View File

@ -0,0 +1,121 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.selector;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.Druids;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DirectDruidClient;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.net.URL;
/**
*/
public class ServerSelectorTest
{
private HttpClient httpClient;
@Before
public void setUp() throws Exception
{
httpClient = EasyMock.createMock(HttpClient.class);
}
@Test
public void testPick() throws Exception
{
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
)
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
httpClient,
"foo2"
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
null,
client1
);
serverSelector.addServer(queryableDruidServer1);
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
null,
client2
);
serverSelector.addServer(queryableDruidServer2);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
client1.run(query);
client1.run(query);
client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 3);
client2.run(query);
client2.run(query);
Assert.assertTrue(client2.getNumOpenConnections() == 2);
Assert.assertTrue(serverSelector.pick() == queryableDruidServer2);
EasyMock.verify(httpClient);
}
}

View File

@ -0,0 +1,227 @@
package com.metamx.druid.query.group.having;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class HavingSpecTest
{
private static final Row ROW = new MapBasedInputRow(0, new ArrayList<String>(), ImmutableMap.of("metric", (Object)Float.valueOf(10)));
@Test
public void testHavingClauseSerde() throws Exception {
List<HavingSpec> havings = Arrays.asList(
new GreaterThanHavingSpec("agg", Double.valueOf(1.3)),
new OrHavingSpec(
Arrays.asList(
new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2)))
)
)
);
HavingSpec andHavingSpec = new AndHavingSpec(havings);
Map<String, Object> notMap = ImmutableMap.<String, Object>of(
"type", "not",
"havingSpec", ImmutableMap.of("type", "equalTo", "aggregation", "equalAgg", "value", 2.0)
);
Map<String, Object> lessMap = ImmutableMap.<String, Object>of(
"type", "lessThan",
"aggregation", "lessAgg",
"value", 1
);
Map<String, Object> greaterMap = ImmutableMap.<String, Object>of(
"type", "greaterThan",
"aggregation", "agg",
"value", 1.3
);
Map<String, Object> orMap = ImmutableMap.<String, Object>of(
"type", "or",
"havingSpecs", ImmutableList.of(lessMap, notMap)
);
Map<String, Object> payloadMap = ImmutableMap.<String, Object>of(
"type", "and",
"havingSpecs", ImmutableList.of(greaterMap, orMap)
);
ObjectMapper mapper = new DefaultObjectMapper();
assertEquals(andHavingSpec, mapper.convertValue(payloadMap, AndHavingSpec.class));
}
@Test
public void testGreaterThanHavingSpec() {
GreaterThanHavingSpec spec = new GreaterThanHavingSpec("metric", 10.003);
assertFalse(spec.eval(ROW));
spec = new GreaterThanHavingSpec("metric", 10);
assertFalse(spec.eval(ROW));
spec = new GreaterThanHavingSpec("metric", 9);
assertTrue(spec.eval(ROW));
}
private MapBasedInputRow makeRow(long ts, String dim, int value)
{
List<String> dimensions = Lists.newArrayList(dim);
Map<String, Object> metrics = ImmutableMap.of("metric", (Object) Float.valueOf(value));
return new MapBasedInputRow(ts, dimensions, metrics);
}
@Test
public void testLessThanHavingSpec() {
LessThanHavingSpec spec = new LessThanHavingSpec("metric", 10);
assertFalse(spec.eval(ROW));
spec = new LessThanHavingSpec("metric", 11);
assertTrue(spec.eval(ROW));
spec = new LessThanHavingSpec("metric", 9);
assertFalse(spec.eval(ROW));
}
@Test
public void testEqualHavingSpec() {
EqualToHavingSpec spec = new EqualToHavingSpec("metric", 10);
assertTrue(spec.eval(ROW));
spec = new EqualToHavingSpec("metric", 9);
assertFalse(spec.eval(ROW));
spec = new EqualToHavingSpec("metric", 11);
assertFalse(spec.eval(ROW));
}
private static class CountingHavingSpec implements HavingSpec {
private final AtomicInteger counter;
private final boolean value;
private CountingHavingSpec(AtomicInteger counter, boolean value)
{
this.counter = counter;
this.value = value;
}
@Override
public boolean eval(Row row)
{
counter.incrementAndGet();
return value;
}
}
@Test
public void testAndHavingSpecShouldSupportShortcutEvaluation () {
AtomicInteger counter = new AtomicInteger(0);
AndHavingSpec spec = new AndHavingSpec(ImmutableList.of(
(HavingSpec)new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false)
));
spec.eval(ROW);
assertEquals(2, counter.get());
}
@Test
public void testAndHavingSpec () {
AtomicInteger counter = new AtomicInteger(0);
AndHavingSpec spec = new AndHavingSpec(ImmutableList.of(
(HavingSpec)new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true)
));
spec.eval(ROW);
assertEquals(4, counter.get());
counter.set(0);
spec = new AndHavingSpec(ImmutableList.of(
(HavingSpec)new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true)
));
spec.eval(ROW);
assertEquals(1, counter.get());
}
@Test
public void testOrHavingSpecSupportsShortcutEvaluation() {
AtomicInteger counter = new AtomicInteger(0);
OrHavingSpec spec = new OrHavingSpec(ImmutableList.of(
(HavingSpec)new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false)
));
spec.eval(ROW);
assertEquals(1, counter.get());
}
@Test
public void testOrHavingSpec () {
AtomicInteger counter = new AtomicInteger(0);
OrHavingSpec spec = new OrHavingSpec(ImmutableList.of(
(HavingSpec)new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false)
));
spec.eval(ROW);
assertEquals(4, counter.get());
counter.set(0);
spec = new OrHavingSpec(ImmutableList.of(
(HavingSpec)new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true)
));
spec.eval(ROW);
assertEquals(4, counter.get());
}
@Test
public void testNotHavingSepc() {
NotHavingSpec spec = new NotHavingSpec(HavingSpec.NEVER);
assertTrue(spec.eval(ROW));
spec = new NotHavingSpec(HavingSpec.ALWAYS);
assertFalse(spec.eval(ROW));
}
}

View File

@ -0,0 +1,79 @@
package com.metamx.druid.query.group.orderby;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@RunWith(Parameterized.class)
public class TopNSorterTest
{
private static final long SEED = 2L;
private static final Ordering<String> ASC = Ordering.natural();
private static final Ordering<String> DESC = Ordering.natural().reverse();
private static final List<String> EMPTY = Collections.EMPTY_LIST;
private static final List<String> SINGLE = Lists.newArrayList("a");
private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
private Ordering<String> ordering;
private List<String> rawInput;
private int limit;
@Parameterized.Parameters
public static Collection<Object[]> makeTestData(){
Object[][] data = new Object[][] {
{ ASC, RAW_ASC, RAW_ASC.size() - 2},
{ ASC, RAW_ASC, RAW_ASC.size()},
{ ASC, RAW_ASC, RAW_ASC.size() + 2},
{ ASC, RAW_ASC, 0},
{ ASC, SINGLE, 0},
{ ASC, SINGLE, 1},
{ ASC, SINGLE, 2},
{ ASC, SINGLE, 3},
{ ASC, EMPTY, 0},
{ ASC, EMPTY, 1},
{ DESC, RAW_DESC, RAW_DESC.size() - 2},
{ DESC, RAW_DESC, RAW_DESC.size()},
{ DESC, RAW_DESC, RAW_DESC.size() + 2},
{ DESC, RAW_DESC, 0},
{ DESC, RAW_DESC, 0},
{ DESC, SINGLE, 1},
{ DESC, SINGLE, 2},
{ DESC, SINGLE, 3},
{ DESC, EMPTY, 0},
{ DESC, EMPTY, 1},
};
return Arrays.asList(data);
}
public TopNSorterTest(Ordering<String> ordering, List<String> rawInput, int limit){
this.ordering = ordering;
this.rawInput = rawInput;
this.limit = limit;
}
@Test
public void testOrderByWithLimit()
{
List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
List<String> inputs = Lists.newArrayList(rawInput);
Collections.shuffle(inputs, new Random(2));
Iterable<String> result = new TopNSorter<String>(ordering).toTopN(inputs, limit);
Assert.assertEquals(expected, Lists.newArrayList(result));
}
}

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -181,7 +181,7 @@ public class DbConnector
dataSource.setPassword(config.getDatabasePassword()); dataSource.setPassword(config.getDatabasePassword());
dataSource.setUrl(config.getDatabaseConnectURI()); dataSource.setUrl(config.getDatabaseConnectURI());
if (config.isValidationQuery()) { if (config.useValidationQuery()) {
dataSource.setValidationQuery(config.getValidationQuery()); dataSource.setValidationQuery(config.getValidationQuery());
dataSource.setTestOnBorrow(true); dataSource.setTestOnBorrow(true);
} }

View File

@ -44,7 +44,7 @@ public abstract class DbConnectorConfig
@JsonProperty("useValidationQuery") @JsonProperty("useValidationQuery")
@Config("druid.database.validation") @Config("druid.database.validation")
public boolean isValidationQuery() { public boolean useValidationQuery() {
return false; return false;
} }

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -29,7 +29,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -74,9 +74,9 @@ public class ColumnDescriptor
return parts; return parts;
} }
public int numBytes() public long numBytes()
{ {
int retVal = 0; long retVal = 0;
for (ColumnPartSerde part : parts) { for (ColumnPartSerde part : parts) {
retVal += part.numBytes(); retVal += part.numBytes();

View File

@ -38,7 +38,7 @@ import java.nio.channels.WritableByteChannel;
}) })
public interface ColumnPartSerde public interface ColumnPartSerde
{ {
public int numBytes(); public long numBytes();
public void write(WritableByteChannel channel) throws IOException; public void write(WritableByteChannel channel) throws IOException;
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder); public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder);
} }

View File

@ -61,7 +61,7 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return column.getSerializedSize(); return column.getSerializedSize();
} }

View File

@ -54,7 +54,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
private final GenericIndexed<ImmutableConciseSet> bitmaps; private final GenericIndexed<ImmutableConciseSet> bitmaps;
private final ImmutableRTree spatialIndex; private final ImmutableRTree spatialIndex;
private final int size; private final long size;
public DictionaryEncodedColumnPartSerde( public DictionaryEncodedColumnPartSerde(
GenericIndexed<String> dictionary, GenericIndexed<String> dictionary,
@ -70,7 +70,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
this.bitmaps = bitmaps; this.bitmaps = bitmaps;
this.spatialIndex = spatialIndex; this.spatialIndex = spatialIndex;
int size = dictionary.getSerializedSize(); long size = dictionary.getSerializedSize();
if (singleValCol != null && multiValCol == null) { if (singleValCol != null && multiValCol == null) {
size += singleValCol.getSerializedSize(); size += singleValCol.getSerializedSize();
} else if (singleValCol == null && multiValCol != null) { } else if (singleValCol == null && multiValCol != null) {
@ -103,7 +103,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return 1 + size; return 1 + size;
} }

View File

@ -58,7 +58,7 @@ public class FloatGenericColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return compressedFloats.getSerializedSize(); return compressedFloats.getSerializedSize();
} }

View File

@ -58,7 +58,7 @@ public class LongGenericColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return compressedLongs.getSerializedSize(); return compressedLongs.getSerializedSize();
} }

View File

@ -150,7 +150,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
}; };
} }
public int getSerializedSize() public long getSerializedSize()
{ {
return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4; return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4;
} }

View File

@ -161,7 +161,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
}; };
} }
public int getSerializedSize() public long getSerializedSize()
{ {
return baseLongBuffers.getSerializedSize() + 1 + 4 + 4; return baseLongBuffers.getSerializedSize() + 1 + 4 + 4;
} }

View File

@ -663,7 +663,7 @@ public class IndexIO
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy); GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy);
final int numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16; final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16;
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer); cols.writeToChannel(writer);
dims9.writeToChannel(writer); dims9.writeToChannel(writer);

View File

@ -207,7 +207,7 @@ public class GenericIndexed<T> implements Indexed<T>
return -(minIndex + 1); return -(minIndex + 1);
} }
public int getSerializedSize() public long getSerializedSize()
{ {
return theBuffer.remaining() + 2 + 4 + 4; return theBuffer.remaining() + 2 + 4 + 4;
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -120,12 +120,6 @@
<outputFile> <outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile> </outputFile>
<relocations>
<relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>druid.org.codehaus.jackson</shadedPattern>
</relocation>
</relocations>
</configuration> </configuration>
</execution> </execution>
</executions> </executions>

View File

@ -504,7 +504,6 @@ public class DeterminePartitionsJob implements Jobby
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
{ {
private static final double SHARD_COMBINE_THRESHOLD = 0.25; private static final double SHARD_COMBINE_THRESHOLD = 0.25;
private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
private static final int HIGH_CARDINALITY_THRESHOLD = 3000000; private static final int HIGH_CARDINALITY_THRESHOLD = 3000000;
@Override @Override
@ -672,7 +671,7 @@ public class DeterminePartitionsJob implements Jobby
// Make sure none of these shards are oversized // Make sure none of these shards are oversized
boolean oversized = false; boolean oversized = false;
for(final DimPartition partition : dimPartitions.partitions) { for(final DimPartition partition : dimPartitions.partitions) {
if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) { if(partition.rows > config.getMaxPartitionSize()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
oversized = true; oversized = true;
} }

View File

@ -236,7 +236,7 @@ public class HadoopDruidIndexerConfig
this.partitionsSpec = partitionsSpec; this.partitionsSpec = partitionsSpec;
} else { } else {
// Backwards compatibility // Backwards compatibility
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false); this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
} }
if(granularitySpec != null) { if(granularitySpec != null) {
@ -431,6 +431,11 @@ public class HadoopDruidIndexerConfig
return partitionsSpec.getTargetPartitionSize(); return partitionsSpec.getTargetPartitionSize();
} }
public long getMaxPartitionSize()
{
return partitionsSpec.getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet() public boolean isUpdaterJobSpecSet()
{ {
return (updaterJobSpec != null); return (updaterJobSpec != null);

View File

@ -102,8 +102,6 @@ public class HadoopDruidIndexerJob implements Jobby
} }
} }
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
if (!config.isLeaveIntermediate()) { if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) { if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath(); Path workingPath = config.makeIntermediatePath();
@ -121,6 +119,8 @@ public class HadoopDruidIndexerJob implements Jobby
throw new ISE(failedMessage); throw new ISE(failedMessage);
} }
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true; return true;
} }

View File

@ -8,22 +8,30 @@ import javax.annotation.Nullable;
public class PartitionsSpec public class PartitionsSpec
{ {
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
@Nullable @Nullable
private final String partitionDimension; private final String partitionDimension;
private final long targetPartitionSize; private final long targetPartitionSize;
private final long maxPartitionSize;
private final boolean assumeGrouped; private final boolean assumeGrouped;
@JsonCreator @JsonCreator
public PartitionsSpec( public PartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension, @JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
) )
{ {
this.partitionDimension = partitionDimension; this.partitionDimension = partitionDimension;
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.maxPartitionSize = maxPartitionSize == null
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
} }
@ -46,6 +54,12 @@ public class PartitionsSpec
return targetPartitionSize; return targetPartitionSize;
} }
@JsonProperty
public long getMaxPartitionSize()
{
return maxPartitionSize;
}
@JsonProperty @JsonProperty
public boolean isAssumeGrouped() public boolean isAssumeGrouped()
{ {

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -39,8 +40,8 @@ public class HadoopDruidIndexerConfigTest
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
try { try {
cfg = jsonMapper.readValue( cfg = jsonReadWriteRead(
"{" "{"
+ " \"granularitySpec\":{" + " \"granularitySpec\":{"
+ " \"type\":\"uniform\"," + " \"type\":\"uniform\","
+ " \"gran\":\"hour\"," + " \"gran\":\"hour\","
@ -74,7 +75,7 @@ public class HadoopDruidIndexerConfigTest
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
try { try {
cfg = jsonMapper.readValue( cfg = jsonReadWriteRead(
"{" "{"
+ "\"segmentGranularity\":\"day\"," + "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]" + "\"intervals\":[\"2012-02-01/P1D\"]"
@ -137,7 +138,7 @@ public class HadoopDruidIndexerConfigTest
public void testInvalidGranularityCombination() { public void testInvalidGranularityCombination() {
boolean thrown = false; boolean thrown = false;
try { try {
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue( final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
"{" "{"
+ "\"segmentGranularity\":\"day\"," + "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]," + "\"intervals\":[\"2012-02-01/P1D\"],"
@ -161,7 +162,7 @@ public class HadoopDruidIndexerConfigTest
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
try { try {
cfg = jsonMapper.readValue( cfg = jsonReadWriteRead(
"{}", "{}",
HadoopDruidIndexerConfig.class HadoopDruidIndexerConfig.class
); );
@ -183,7 +184,7 @@ public class HadoopDruidIndexerConfigTest
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
try { try {
cfg = jsonMapper.readValue( cfg = jsonReadWriteRead(
"{" "{"
+ "\"partitionsSpec\":{" + "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100" + " \"targetPartitionSize\":100"
@ -221,7 +222,7 @@ public class HadoopDruidIndexerConfigTest
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
try { try {
cfg = jsonMapper.readValue( cfg = jsonReadWriteRead(
"{" "{"
+ "\"partitionsSpec\":{" + "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100," + " \"targetPartitionSize\":100,"
@ -248,6 +249,12 @@ public class HadoopDruidIndexerConfigTest
100 100
); );
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
partitionsSpec.getPartitionDimension(), partitionsSpec.getPartitionDimension(),
@ -260,7 +267,7 @@ public class HadoopDruidIndexerConfigTest
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
try { try {
cfg = jsonMapper.readValue( cfg = jsonReadWriteRead(
"{" "{"
+ "\"targetPartitionSize\":100," + "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\"" + "\"partitionDimension\":\"foo\""
@ -285,6 +292,58 @@ public class HadoopDruidIndexerConfigTest
100 100
); );
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecMaxPartitionSize() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"maxPartitionSize\":200,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
200
);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
partitionsSpec.getPartitionDimension(), partitionsSpec.getPartitionDimension(),
@ -296,7 +355,7 @@ public class HadoopDruidIndexerConfigTest
public void testInvalidPartitionsCombination() { public void testInvalidPartitionsCombination() {
boolean thrown = false; boolean thrown = false;
try { try {
final HadoopDruidIndexerConfig cfg = jsonMapper.readValue( final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
"{" "{"
+ "\"targetPartitionSize\":100," + "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{" + "\"partitionsSpec\":{"
@ -311,4 +370,40 @@ public class HadoopDruidIndexerConfigTest
Assert.assertTrue("Exception thrown", thrown); Assert.assertTrue("Exception thrown", thrown);
} }
@Test
public void testDbUpdaterJobSpec() throws Exception
{
final HadoopDruidIndexerConfig cfg;
cfg = jsonReadWriteRead(
"{"
+ "\"updaterJobSpec\":{\n"
+ " \"type\" : \"db\",\n"
+ " \"connectURI\" : \"jdbc:mysql://localhost/druid\",\n"
+ " \"user\" : \"rofl\",\n"
+ " \"password\" : \"p4ssw0rd\",\n"
+ " \"segmentTable\" : \"segments\"\n"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
final DbUpdaterJobSpec spec = (DbUpdaterJobSpec) cfg.getUpdaterJobSpec();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", spec.getDatabaseConnectURI());
Assert.assertEquals("rofl", spec.getDatabaseUser());
Assert.assertEquals("p4ssw0rd", spec.getDatabasePassword());
Assert.assertEquals(false, spec.useValidationQuery());
}
private <T> T jsonReadWriteRead(String s, Class<T> klass)
{
try {
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
@ -230,6 +231,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return null; return null;
} }
public boolean isWorkerRunningTask(String workerHost, String taskId)
{
ZkWorker zkWorker = zkWorkers.get(workerHost);
return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId));
}
/** /**
* A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task. * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
* *
@ -350,6 +358,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/** /**
* Adds a task to the pending queue * Adds a task to the pending queue
*
* @param taskRunnerWorkItem * @param taskRunnerWorkItem
*/ */
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
@ -489,25 +498,36 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
} }
cf.create() String taskPath = JOINER.join(config.getIndexerTaskPath(), theWorker.getHost(), task.getId());
.withMode(CreateMode.EPHEMERAL)
.forPath( if (cf.checkExists().forPath(taskPath) == null) {
JOINER.join( cf.create()
config.getIndexerTaskPath(), .withMode(CreateMode.EPHEMERAL)
theWorker.getHost(), .forPath(
task.getId() taskPath, rawBytes
), );
rawBytes }
);
runningTasks.put(task.getId(), pendingTasks.remove(task.getId())); runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
log.info("Task %s switched from pending to running", task.getId()); log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks // on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = new Stopwatch();
timeoutStopwatch.start();
synchronized (statusLock) { synchronized (statusLock) {
while (findWorkerRunningTask(task.getId()) == null) { while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
if (timeoutStopwatch.elapsedMillis() >= config.getTaskAssignmentTimeoutDuration().getMillis()) {
log.error(
"Something went wrong! %s never ran task %s after %s!",
theWorker.getHost(),
task.getId(),
config.getTaskAssignmentTimeoutDuration()
);
retryTask(runningTasks.get(task.getId()), theWorker.getHost());
break;
}
} }
} }
} }
@ -687,7 +707,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
); );
if (workerQueue.isEmpty()) { if (workerQueue.isEmpty()) {
log.info("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null; return null;
} }

View File

@ -28,7 +28,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
@ -51,20 +50,19 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Runs tasks in a JVM thread using an ExecutorService. * Runs tasks in a JVM thread using an ExecutorService.
*/ */
public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
private final TaskToolboxFactory toolboxFactory; private final TaskToolboxFactory toolboxFactory;
private final ListeningExecutorService exec; private final ListeningExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>(); private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
private static final EmittingLogger log = new EmittingLogger(ExecutorServiceTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
public ExecutorServiceTaskRunner( public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory, TaskToolboxFactory toolboxFactory,
ExecutorService exec ExecutorService exec
) )

View File

@ -59,7 +59,7 @@ import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.ChatHandlerProvider; import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; import com.metamx.druid.merger.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig; import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
@ -118,7 +118,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private ServiceAnnouncer serviceAnnouncer = null; private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null; private ServiceProvider coordinatorServiceProvider = null;
private Server server = null; private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null; private ThreadPoolTaskRunner taskRunner = null;
private ExecutorLifecycle executorLifecycle = null; private ExecutorLifecycle executorLifecycle = null;
private ChatHandlerProvider chatHandlerProvider = null; private ChatHandlerProvider chatHandlerProvider = null;
@ -247,7 +247,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
executorLifecycle.join(); executorLifecycle.join();
} }
public ExecutorServiceTaskRunner getTaskRunner() public ThreadPoolTaskRunner getTaskRunner()
{ {
return taskRunner; return taskRunner;
} }
@ -414,7 +414,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{ {
if (taskRunner == null) { if (taskRunner == null) {
this.taskRunner = lifecycle.addManagedInstance( this.taskRunner = lifecycle.addManagedInstance(
new ExecutorServiceTaskRunner( new ThreadPoolTaskRunner(
taskToolboxFactory, taskToolboxFactory,
Executors.newSingleThreadExecutor( Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()

View File

@ -291,7 +291,7 @@ public class RemoteTaskRunnerTest
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
cf, cf,
workerCuratorCoordinator, workerCuratorCoordinator,
new ExecutorServiceTaskRunner( new ThreadPoolTaskRunner(
new TaskToolboxFactory( new TaskToolboxFactory(
new TaskConfig() new TaskConfig()
{ {

View File

@ -162,7 +162,7 @@ public class TaskLifecycleTest
new DefaultObjectMapper() new DefaultObjectMapper()
); );
tr = new ExecutorServiceTaskRunner( tr = new ThreadPoolTaskRunner(
tb, tb,
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()
); );

12
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
@ -214,16 +214,6 @@
<artifactId>jackson-jaxrs-json-provider</artifactId> <artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.1.4</version> <version>2.1.4</version>
</dependency> </dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.11</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.11</version>
</dependency>
<dependency> <dependency>
<groupId>javax.inject</groupId> <groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId> <artifactId>javax.inject</artifactId>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -20,6 +20,7 @@
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
@ -257,6 +258,9 @@ public class ServerManager implements QuerySegmentWalker
); );
} }
} }
)
.filter(
Predicates.<QueryRunner<T>>notNull()
); );
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);

View File

@ -69,7 +69,7 @@ public class TestHelper
assertResult(failMsg, expectedNext, next); assertResult(failMsg, expectedNext, next);
assertResult( assertResult(
String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg), String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
expectedNext, expectedNext,
next2 next2
); );
@ -77,20 +77,20 @@ public class TestHelper
if (resultsIter.hasNext()) { if (resultsIter.hasNext()) {
Assert.fail( Assert.fail(
String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
); );
} }
if (resultsIter2.hasNext()) { if (resultsIter2.hasNext()) {
Assert.fail( Assert.fail(
String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
); );
} }
if (expectedResultsIter.hasNext()) { if (expectedResultsIter.hasNext()) {
Assert.fail( Assert.fail(
String.format( String.format(
"%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() "%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
) )
); );
} }
@ -109,7 +109,7 @@ public class TestHelper
Assert.assertEquals(failMsg, expectedNext, next); Assert.assertEquals(failMsg, expectedNext, next);
Assert.assertEquals( Assert.assertEquals(
String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg), String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
expectedNext, expectedNext,
next2 next2
); );
@ -117,20 +117,20 @@ public class TestHelper
if (resultsIter.hasNext()) { if (resultsIter.hasNext()) {
Assert.fail( Assert.fail(
String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
); );
} }
if (resultsIter2.hasNext()) { if (resultsIter2.hasNext()) {
Assert.fail( Assert.fail(
String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
); );
} }
if (expectedResultsIter.hasNext()) { if (expectedResultsIter.hasNext()) {
Assert.fail( Assert.fail(
String.format( String.format(
"%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() "%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
) )
); );
} }

View File

@ -22,9 +22,11 @@ package com.metamx.druid.query.group;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.druid.PeriodGranularity; import com.metamx.druid.PeriodGranularity;
@ -41,7 +43,12 @@ import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerTestHelper; import com.metamx.druid.query.QueryRunnerTestHelper;
import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.group.limit.OrderByColumnSpec; import com.metamx.druid.query.group.having.EqualToHavingSpec;
import com.metamx.druid.query.group.having.GreaterThanHavingSpec;
import com.metamx.druid.query.group.having.OrHavingSpec;
import com.metamx.druid.query.group.orderby.DefaultLimitSpec;
import com.metamx.druid.query.group.orderby.LimitSpec;
import com.metamx.druid.query.group.orderby.OrderByColumnSpec;
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec; import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -56,6 +63,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -69,28 +77,30 @@ public class GroupByQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException public static Collection<?> constructorFeeder() throws IOException
{ {
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine( new GroupByQueryEngine(
new GroupByQueryEngineConfig() new GroupByQueryEngineConfig()
{ {
@Override @Override
public int getMaxIntermediateRows() public int getMaxIntermediateRows()
{ {
return 10000; return 10000;
} }
}, },
new StupidPool<ByteBuffer>( new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>() new Supplier<ByteBuffer>()
{ {
@Override @Override
public ByteBuffer get() public ByteBuffer get()
{ {
return ByteBuffer.allocate(1024 * 1024); return ByteBuffer.allocate(1024 * 1024);
} }
} }
) )
), ),
new GroupByQueryRunnerFactoryConfig(){} new GroupByQueryRunnerFactoryConfig()
); {
}
);
return Lists.newArrayList( return Lists.newArrayList(
Iterables.transform( Iterables.transform(
@ -106,13 +116,15 @@ public class GroupByQueryRunnerTest
); );
} }
public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner) { public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner)
{
this.factory = factory; this.factory = factory;
this.runner = runner; this.runner = runner;
} }
@Test @Test
public void testGroupBy() { public void testGroupBy()
{
GroupByQuery query = GroupByQuery GroupByQuery query = GroupByQuery
.builder() .builder()
.setDataSource(QueryRunnerTestHelper.dataSource) .setDataSource(QueryRunnerTestHelper.dataSource)
@ -127,35 +139,36 @@ public class GroupByQueryRunnerTest
.setGranularity(QueryRunnerTestHelper.dayGran) .setGranularity(QueryRunnerTestHelper.dayGran)
.build(); .build();
List<Row> expectedResults = Arrays.asList( List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
); );
Iterable<Row> results = Sequences.toList(runner.run(query), Lists.<Row>newArrayList()); Iterable<Row> results = Sequences.toList(runner.run(query), Lists.<Row>newArrayList());
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test @Test
public void testGroupByWithTimeZone() { public void testGroupByWithTimeZone()
{
DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
GroupByQuery query = GroupByQuery.builder() GroupByQuery query = GroupByQuery.builder()
@ -187,38 +200,39 @@ public class GroupByQueryRunnerTest
) )
.build(); .build();
List<Row> expectedResults = Arrays.asList( List<Row> expectedResults = Arrays.asList(
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "business", "rows", 1L, "idx", 118L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "business", "rows", 1L, "idx", 118L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "entertainment", "rows", 1L, "idx", 158L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "entertainment", "rows", 1L, "idx", 158L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "health", "rows", 1L, "idx", 120L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "health", "rows", 1L, "idx", 120L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "mezzanine", "rows", 3L, "idx", 2870L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "mezzanine", "rows", 3L, "idx", 2870L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "news", "rows", 1L, "idx", 121L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "news", "rows", 1L, "idx", 121L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "premium", "rows", 3L, "idx", 2900L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "premium", "rows", 3L, "idx", 2900L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "technology", "rows", 1L, "idx", 78L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "technology", "rows", 1L, "idx", 78L),
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "travel", "rows", 1L, "idx", 119L), createExpectedRow(new DateTime("2011-03-31", tz), "alias", "travel", "rows", 1L, "idx", 119L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "automotive", "rows", 1L, "idx", 147L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "automotive", "rows", 1L, "idx", 147L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "business", "rows", 1L, "idx", 112L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "business", "rows", 1L, "idx", 112L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "entertainment", "rows", 1L, "idx", 166L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "entertainment", "rows", 1L, "idx", 166L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "health", "rows", 1L, "idx", 113L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "health", "rows", 1L, "idx", 113L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "mezzanine", "rows", 3L, "idx", 2447L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "mezzanine", "rows", 3L, "idx", 2447L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "news", "rows", 1L, "idx", 114L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "news", "rows", 1L, "idx", 114L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "premium", "rows", 3L, "idx", 2505L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "premium", "rows", 3L, "idx", 2505L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "technology", "rows", 1L, "idx", 97L), createExpectedRow(new DateTime("2011-04-01", tz), "alias", "technology", "rows", 1L, "idx", 97L),
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L) createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L)
); );
Iterable<Row> results = Sequences.toList( Iterable<Row> results = Sequences.toList(
runner.run(query), runner.run(query),
Lists.<Row>newArrayList() Lists.<Row>newArrayList()
); );
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test @Test
public void testMergeResults() { public void testMergeResults()
{
GroupByQuery.Builder builder = GroupByQuery GroupByQuery.Builder builder = GroupByQuery
.builder() .builder()
.setDataSource(QueryRunnerTestHelper.dataSource) .setDataSource(QueryRunnerTestHelper.dataSource)
@ -282,7 +296,170 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct"); TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged"); TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
}
@Test
public void testMergeResultsWithLimit()
{
for (int limit = 1; limit < 20; ++limit) {
doTestMergeResultsWithValidLimit(limit);
}
}
private void doTestMergeResultsWithValidLimit(final int limit)
{
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setLimit(Integer.valueOf(limit));
final GroupByQuery fullQuery = builder.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery), String.format("limit: %d", limit)
);
}
@Test(expected = IllegalArgumentException.class)
public void testMergeResultsWithNegativeLimit()
{
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setLimit(Integer.valueOf(-1));
builder.build();
}
@Test
public void testMergeResultsWithOrderBy()
{
LimitSpec[] orderBySpecs = new LimitSpec[]{
new DefaultLimitSpec(OrderByColumnSpec.ascending("idx"), null),
new DefaultLimitSpec(OrderByColumnSpec.ascending("rows", "idx"), null),
new DefaultLimitSpec(OrderByColumnSpec.descending("idx"), null),
new DefaultLimitSpec(OrderByColumnSpec.descending("rows", "idx"), null),
};
final Comparator<Row> idxComparator =
new Comparator<Row>()
{
@Override
public int compare(Row o1, Row o2)
{
return Float.compare(o1.getFloatMetric("idx"), o2.getFloatMetric("idx"));
}
};
Comparator<Row> rowsIdxComparator =
new Comparator<Row>()
{
@Override
public int compare(Row o1, Row o2)
{
int value = Float.compare(o1.getFloatMetric("rows"), o2.getFloatMetric("rows"));
if (value != 0) {
return value;
}
return idxComparator.compare(o1, o2);
}
};
List<Row> allResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
List<List<Row>> expectedResults = Lists.newArrayList(
Ordering.from(idxComparator).sortedCopy(allResults),
Ordering.from(rowsIdxComparator).sortedCopy(allResults),
Ordering.from(idxComparator).reverse().sortedCopy(allResults),
Ordering.from(rowsIdxComparator).reverse().sortedCopy(allResults)
);
for (int i = 0; i < orderBySpecs.length; ++i) {
doTestMergeResultsWithOrderBy(orderBySpecs[i], expectedResults.get(i));
}
}
private void doTestMergeResultsWithOrderBy(LimitSpec orderBySpec, List<Row> expectedResults)
{
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setLimitSpec(orderBySpec);
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
new QueryRunner<Row>()
{
@Override
public Sequence run(Query<Row> query)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1), runner.run(query2));
}
}
);
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
} }
@Test @Test
@ -403,6 +580,59 @@ public class GroupByQueryRunnerTest
); );
} }
@Test
public void testHavingSpec()
{
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L)
);
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setHavingSpec(
new OrHavingSpec(
ImmutableList.of(
new GreaterThanHavingSpec("rows", 2L),
new EqualToHavingSpec("idx", 217L)
)
)
);
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
new QueryRunner<Row>()
{
@Override
public Sequence run(Query<Row> query)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1), runner.run(query2));
}
}
);
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
}
private Row createExpectedRow(final String timestamp, Object... vals) private Row createExpectedRow(final String timestamp, Object... vals)
{ {
return createExpectedRow(new DateTime(timestamp), vals); return createExpectedRow(new DateTime(timestamp), vals);
@ -413,10 +643,10 @@ public class GroupByQueryRunnerTest
Preconditions.checkArgument(vals.length % 2 == 0); Preconditions.checkArgument(vals.length % 2 == 0);
Map<String, Object> theVals = Maps.newHashMap(); Map<String, Object> theVals = Maps.newHashMap();
for (int i = 0; i < vals.length; i+=2) { for (int i = 0; i < vals.length; i += 2) {
theVals.put(vals[i].toString(), vals[i+1]); theVals.put(vals[i].toString(), vals[i + 1]);
} }
return new MapBasedRow(timestamp, theVals); return new MapBasedRow(new DateTime(timestamp), theVals);
} }
} }

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.4.9-SNAPSHOT</version> <version>0.4.15-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>