mirror of https://github.com/apache/druid.git
Merge branch 'master' of https://github.com/metamx/druid
This commit is contained in:
commit
4c092cb280
|
@ -18,8 +18,7 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-client</artifactId>
|
||||
|
@ -29,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -137,7 +137,7 @@ public class Announcer
|
|||
@Override
|
||||
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
||||
{
|
||||
log.info("Path[%s] got event[%s]", parentPath, event);
|
||||
log.debug("Path[%s] got event[%s]", parentPath, event);
|
||||
switch (event.getType()) {
|
||||
case CHILD_REMOVED:
|
||||
final ChildData child = event.getData();
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package com.metamx.druid.curator.discovery;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
|
||||
public class AddressPortServiceInstanceFactory implements ServiceInstanceFactory<Void>
|
||||
{
|
||||
private final String address;
|
||||
private final int port;
|
||||
|
||||
public AddressPortServiceInstanceFactory(String address, int port)
|
||||
{
|
||||
this.address = address;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceInstance<Void> create(String service)
|
||||
{
|
||||
try {
|
||||
return ServiceInstance.<Void>builder().name(service).address(address).port(port).build();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package com.metamx.druid.curator.discovery;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Uses the Curator Service Discovery recipe to announce services.
|
||||
*/
|
||||
public class CuratorServiceAnnouncer<T> implements ServiceAnnouncer
|
||||
{
|
||||
private static final Logger log = new Logger(CuratorServiceAnnouncer.class);
|
||||
|
||||
private final ServiceDiscovery<T> discovery;
|
||||
private final ServiceInstanceFactory<T> instanceFactory;
|
||||
private final Map<String, ServiceInstance<T>> instanceMap = Maps.newHashMap();
|
||||
private final Object monitor = new Object();
|
||||
|
||||
public CuratorServiceAnnouncer(
|
||||
ServiceDiscovery<T> discovery,
|
||||
ServiceInstanceFactory<T> instanceFactory
|
||||
)
|
||||
{
|
||||
this.discovery = discovery;
|
||||
this.instanceFactory = instanceFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announce(String service) throws Exception
|
||||
{
|
||||
final ServiceInstance<T> instance;
|
||||
|
||||
synchronized (monitor) {
|
||||
if (instanceMap.containsKey(service)) {
|
||||
log.warn("Ignoring request to announce service[%s]", service);
|
||||
return;
|
||||
} else {
|
||||
instance = instanceFactory.create(service);
|
||||
instanceMap.put(service, instance);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
log.info("Announcing service[%s]", service);
|
||||
discovery.registerService(instance);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to announce service[%s]", service);
|
||||
synchronized (monitor) {
|
||||
instanceMap.remove(service);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounce(String service) throws Exception
|
||||
{
|
||||
final ServiceInstance<T> instance;
|
||||
|
||||
synchronized (monitor) {
|
||||
instance = instanceMap.get(service);
|
||||
if (instance == null) {
|
||||
log.warn("Ignoring request to unannounce service[%s]", service);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Unannouncing service[%s]", service);
|
||||
try {
|
||||
discovery.unregisterService(instance);
|
||||
} catch (Exception e) {
|
||||
log.warn(e, "Failed to unannounce service[%s]", service);
|
||||
} finally {
|
||||
synchronized (monitor) {
|
||||
instanceMap.remove(service);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package com.metamx.druid.curator.discovery;
|
||||
|
||||
/**
|
||||
* Does nothing.
|
||||
*/
|
||||
public class NoopServiceAnnouncer implements ServiceAnnouncer
|
||||
{
|
||||
@Override
|
||||
public void unannounce(String service)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announce(String service)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package com.metamx.druid.curator.discovery;
|
||||
|
||||
/**
|
||||
* Announces our ability to serve a particular function. Multiple users may announce the same service, in which
|
||||
* case they are treated as interchangeable instances of that service.
|
||||
*/
|
||||
public interface ServiceAnnouncer
|
||||
{
|
||||
public void announce(String service) throws Exception;
|
||||
public void unannounce(String service) throws Exception;
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package com.metamx.druid.curator.discovery;
|
||||
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
|
||||
public interface ServiceInstanceFactory<T>
|
||||
{
|
||||
public ServiceInstance<T> create(String service);
|
||||
}
|
|
@ -42,6 +42,8 @@ import com.metamx.druid.client.cache.MapCache;
|
|||
import com.metamx.druid.client.cache.MapCacheConfig;
|
||||
import com.metamx.druid.client.cache.MemcachedCache;
|
||||
import com.metamx.druid.client.cache.MemcachedCacheConfig;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -225,15 +227,17 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
|||
{
|
||||
if (useDiscovery) {
|
||||
final Lifecycle lifecycle = getLifecycle();
|
||||
|
||||
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
|
||||
CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
|
||||
final CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
|
||||
serviceDiscoveryConfig, lifecycle
|
||||
);
|
||||
|
||||
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
|
||||
curatorFramework, serviceDiscoveryConfig, lifecycle
|
||||
);
|
||||
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
|
||||
serviceDiscoveryConfig, serviceDiscovery
|
||||
);
|
||||
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,10 @@ import com.metamx.common.config.Config;
|
|||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory;
|
||||
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
|
||||
import com.metamx.druid.http.EmittingRequestLogger;
|
||||
import com.metamx.druid.http.FileRequestLogger;
|
||||
import com.metamx.druid.http.RequestLogger;
|
||||
|
@ -121,29 +125,39 @@ public class Initialization
|
|||
if (tmp_props.getProperty(zkHostsProperty) != null) {
|
||||
final ConfigurationObjectFactory factory = Config.createFactory(tmp_props);
|
||||
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
ZkPathsConfig config;
|
||||
try {
|
||||
final ZkPathsConfig config = factory.build(ZkPathsConfig.class);
|
||||
CuratorFramework curator = makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle);
|
||||
|
||||
lifecycle.start();
|
||||
|
||||
final Stat stat = curator.checkExists().forPath(config.getPropertiesPath());
|
||||
if (stat != null) {
|
||||
final byte[] data = curator.getData().forPath(config.getPropertiesPath());
|
||||
zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
|
||||
}
|
||||
|
||||
// log properties from zk
|
||||
for (String prop : zkProps.stringPropertyNames()) {
|
||||
log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
|
||||
}
|
||||
config = factory.build(ZkPathsConfig.class);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
catch (IllegalArgumentException e) {
|
||||
log.warn(e, "Unable to build ZkPathsConfig. Cannot load properties from ZK.");
|
||||
config = null;
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
|
||||
if (config != null) {
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
try {
|
||||
CuratorFramework curator = makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle);
|
||||
|
||||
lifecycle.start();
|
||||
|
||||
final Stat stat = curator.checkExists().forPath(config.getPropertiesPath());
|
||||
if (stat != null) {
|
||||
final byte[] data = curator.getData().forPath(config.getPropertiesPath());
|
||||
zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
|
||||
}
|
||||
|
||||
// log properties from zk
|
||||
for (String prop : zkProps.stringPropertyNames()) {
|
||||
log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.warn("property[%s] not set, skipping ZK-specified properties.", zkHostsProperty);
|
||||
|
@ -214,17 +228,10 @@ public class Initialization
|
|||
)
|
||||
throws Exception
|
||||
{
|
||||
final ServiceInstance serviceInstance =
|
||||
ServiceInstance.builder()
|
||||
.name(config.getServiceName().replace('/', ':'))
|
||||
.address(addressFromHost(config.getHost()))
|
||||
.port(config.getPort())
|
||||
.build();
|
||||
final ServiceDiscovery serviceDiscovery =
|
||||
ServiceDiscoveryBuilder.builder(Void.class)
|
||||
.basePath(config.getDiscoveryPath())
|
||||
.client(discoveryClient)
|
||||
.thisInstance(serviceInstance)
|
||||
.build();
|
||||
|
||||
lifecycle.addHandler(
|
||||
|
@ -252,6 +259,46 @@ public class Initialization
|
|||
return serviceDiscovery;
|
||||
}
|
||||
|
||||
public static ServiceAnnouncer makeServiceAnnouncer(
|
||||
ServiceDiscoveryConfig config,
|
||||
ServiceDiscovery serviceDiscovery
|
||||
)
|
||||
{
|
||||
final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config);
|
||||
return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory);
|
||||
}
|
||||
|
||||
public static void announceDefaultService(
|
||||
final ServiceDiscoveryConfig config,
|
||||
final ServiceAnnouncer serviceAnnouncer,
|
||||
final Lifecycle lifecycle
|
||||
) throws Exception
|
||||
{
|
||||
final String service = config.getServiceName().replace('/', ':');
|
||||
|
||||
lifecycle.addHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
serviceAnnouncer.announce(service);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
try {
|
||||
serviceAnnouncer.unannounce(service);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to unannouce default service[%s]", service);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static ServiceProvider makeServiceProvider(
|
||||
String serviceName,
|
||||
ServiceDiscovery serviceDiscovery,
|
||||
|
@ -310,13 +357,17 @@ public class Initialization
|
|||
);
|
||||
}
|
||||
|
||||
public static String addressFromHost(final String host)
|
||||
public static ServiceInstanceFactory<Void> makeServiceInstanceFactory(ServiceDiscoveryConfig config)
|
||||
{
|
||||
final String host = config.getHost();
|
||||
final String address;
|
||||
final int colon = host.indexOf(':');
|
||||
if (colon < 0) {
|
||||
return host;
|
||||
address = host;
|
||||
} else {
|
||||
return host.substring(0, colon);
|
||||
address = host.substring(0, colon);
|
||||
}
|
||||
|
||||
return new AddressPortServiceInstanceFactory(address, config.getPort());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package com.metamx.druid.initialization;
|
||||
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.skife.config.Config;
|
||||
|
||||
public abstract class ZkPathsConfig
|
||||
|
@ -56,7 +57,32 @@ public abstract class ZkPathsConfig
|
|||
return defaultPath("master");
|
||||
}
|
||||
|
||||
private String defaultPath(final String subPath) {
|
||||
return String.format("%s/%s", getZkBasePath(), subPath);
|
||||
@Config("druid.zk.paths.indexer.announcementsPath")
|
||||
public String getIndexerAnnouncementPath()
|
||||
{
|
||||
return defaultPath("indexer/announcements");
|
||||
}
|
||||
|
||||
@Config("druid.zk.paths.indexer.tasksPath")
|
||||
public String getIndexerTaskPath()
|
||||
{
|
||||
return defaultPath("indexer/tasks");
|
||||
}
|
||||
|
||||
@Config("druid.zk.paths.indexer.statusPath")
|
||||
public String getIndexerStatusPath()
|
||||
{
|
||||
return defaultPath("indexer/status");
|
||||
}
|
||||
|
||||
@Config("druid.zk.paths.indexer.leaderLatchPath")
|
||||
public String getIndexerLeaderLatchPath()
|
||||
{
|
||||
return defaultPath("indexer/leaderLatchPath");
|
||||
}
|
||||
|
||||
private String defaultPath(final String subPath)
|
||||
{
|
||||
return ZKPaths.makePath(getZkBasePath(), subPath);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,9 +21,12 @@ package com.metamx.druid.query.group;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.druid.BaseQuery;
|
||||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.QueryGranularity;
|
||||
|
@ -34,6 +37,10 @@ import com.metamx.druid.query.Queries;
|
|||
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.query.filter.DimFilter;
|
||||
import com.metamx.druid.query.group.limit.DefaultLimitSpec;
|
||||
import com.metamx.druid.query.group.limit.LimitSpec;
|
||||
import com.metamx.druid.query.group.limit.NoopLimitSpec;
|
||||
import com.metamx.druid.query.group.limit.OrderByColumnSpec;
|
||||
import com.metamx.druid.query.segment.LegacySegmentSpec;
|
||||
import com.metamx.druid.query.segment.QuerySegmentSpec;
|
||||
|
||||
|
@ -49,16 +56,20 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new Builder();
|
||||
}
|
||||
|
||||
private final LimitSpec limitSpec;
|
||||
private final DimFilter dimFilter;
|
||||
private final QueryGranularity granularity;
|
||||
private final List<DimensionSpec> dimensions;
|
||||
private final List<AggregatorFactory> aggregatorSpecs;
|
||||
private final List<PostAggregator> postAggregatorSpecs;
|
||||
|
||||
private final Function<Sequence<Row>, Sequence<Row>> orderByLimitFn;
|
||||
|
||||
@JsonCreator
|
||||
public GroupByQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||
@JsonProperty("filter") DimFilter dimFilter,
|
||||
@JsonProperty("granularity") QueryGranularity granularity,
|
||||
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
|
||||
|
@ -68,6 +79,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
|
||||
|
@ -77,6 +89,14 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
|
||||
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
|
||||
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
|
||||
|
||||
orderByLimitFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public LimitSpec getLimitSpec()
|
||||
{
|
||||
return limitSpec;
|
||||
}
|
||||
|
||||
@JsonProperty("filter")
|
||||
|
@ -121,12 +141,18 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return Query.GROUP_BY;
|
||||
}
|
||||
|
||||
public Sequence<Row> applyLimit(Sequence<Row> results)
|
||||
{
|
||||
return orderByLimitFn.apply(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GroupByQuery withOverriddenContext(Map<String, String> contextOverride)
|
||||
{
|
||||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
getQuerySegmentSpec(),
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
|
@ -142,6 +168,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
spec,
|
||||
limitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
|
@ -162,12 +189,17 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private List<PostAggregator> postAggregatorSpecs;
|
||||
private Map<String, String> context;
|
||||
|
||||
private LimitSpec limitSpec = null;
|
||||
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
|
||||
private int limit = Integer.MAX_VALUE;
|
||||
|
||||
private Builder() {}
|
||||
|
||||
private Builder(Builder builder)
|
||||
{
|
||||
dataSource = builder.dataSource;
|
||||
querySegmentSpec = builder.querySegmentSpec;
|
||||
limitSpec = builder.limitSpec;
|
||||
dimFilter = builder.dimFilter;
|
||||
granularity = builder.granularity;
|
||||
dimensions = builder.dimensions;
|
||||
|
@ -187,6 +219,56 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
|
||||
}
|
||||
|
||||
public Builder limit(int limit)
|
||||
{
|
||||
ensureExplicitLimitNotSet();
|
||||
this.limit = limit;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addOrderByColumn(String dimension)
|
||||
{
|
||||
return addOrderByColumn(dimension, (OrderByColumnSpec.Direction) null);
|
||||
}
|
||||
|
||||
public Builder addOrderByColumn(String dimension, String direction)
|
||||
{
|
||||
return addOrderByColumn(dimension, OrderByColumnSpec.determineDirection(direction));
|
||||
}
|
||||
|
||||
public Builder addOrderByColumn(String dimension, OrderByColumnSpec.Direction direction)
|
||||
{
|
||||
return addOrderByColumn(new OrderByColumnSpec(dimension, direction));
|
||||
}
|
||||
|
||||
public Builder addOrderByColumn(OrderByColumnSpec columnSpec)
|
||||
{
|
||||
ensureExplicitLimitNotSet();
|
||||
this.orderByColumnSpecs.add(columnSpec);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setLimitSpec(LimitSpec limitSpec)
|
||||
{
|
||||
ensureFluentLimitsNotSet();
|
||||
this.limitSpec = limitSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
private void ensureExplicitLimitNotSet()
|
||||
{
|
||||
if (limitSpec != null) {
|
||||
throw new ISE("Ambiguous build, limitSpec[%s] already set", limitSpec);
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureFluentLimitsNotSet()
|
||||
{
|
||||
if (! (limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty()) ) {
|
||||
throw new ISE("Ambiguous build, limit[%s] or columnSpecs[%s] already set.", limit, orderByColumnSpecs);
|
||||
}
|
||||
}
|
||||
|
||||
public Builder setQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
|
||||
{
|
||||
this.querySegmentSpec = querySegmentSpec;
|
||||
|
@ -276,9 +358,18 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
|
||||
public GroupByQuery build()
|
||||
{
|
||||
final LimitSpec theLimitSpec;
|
||||
if (limitSpec == null) {
|
||||
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
|
||||
}
|
||||
else {
|
||||
theLimitSpec = limitSpec;
|
||||
}
|
||||
|
||||
return new GroupByQuery(
|
||||
dataSource,
|
||||
querySegmentSpec,
|
||||
theLimitSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
|
@ -288,4 +379,18 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "GroupByQuery{" +
|
||||
"limitSpec=" + limitSpec +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity=" + granularity +
|
||||
", dimensions=" + dimensions +
|
||||
", aggregatorSpecs=" + aggregatorSpecs +
|
||||
", postAggregatorSpecs=" + postAggregatorSpecs +
|
||||
", orderByLimitFn=" + orderByLimitFn +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,10 +21,10 @@ package com.metamx.druid.query.group;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Accumulator;
|
||||
import com.metamx.common.guava.ConcatSequence;
|
||||
|
@ -44,7 +44,6 @@ import com.metamx.druid.query.QueryToolChest;
|
|||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Minutes;
|
||||
|
||||
|
@ -57,8 +56,9 @@ import java.util.Properties;
|
|||
*/
|
||||
public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery>
|
||||
{
|
||||
|
||||
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>(){};
|
||||
private static final TypeReference<Row> TYPE_REFERENCE = new TypeReference<Row>()
|
||||
{
|
||||
};
|
||||
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
|
||||
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
|
||||
|
||||
|
@ -81,22 +81,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
{
|
||||
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
return runner.run(input);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Sequence<Row> mergeGroupByResults(GroupByQuery query, QueryRunner<Row> runner)
|
||||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
||||
{
|
||||
final QueryGranularity gran = query.getGranularity();
|
||||
final long timeStart = query.getIntervals().get(0).getStartMillis();
|
||||
|
||||
// use gran.iterable instead of gran.truncate so that
|
||||
// AllGranularity returns timeStart instead of Long.MIN_VALUE
|
||||
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
|
||||
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
|
||||
|
||||
final List<AggregatorFactory> aggs = Lists.transform(
|
||||
query.getAggregatorSpecs(),
|
||||
|
@ -144,7 +143,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
);
|
||||
|
||||
// convert millis back to timestamp according to granularity to preserve time zone information
|
||||
return Sequences.map(
|
||||
Sequence<Row> retVal = Sequences.map(
|
||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||
new Function<Row, Row>()
|
||||
{
|
||||
|
@ -156,6 +155,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
return query.applyLimit(retVal);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -183,9 +184,24 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
}
|
||||
|
||||
@Override
|
||||
public Function<Row, Row> makeMetricManipulatorFn(GroupByQuery query, MetricManipulationFn fn)
|
||||
public Function<Row, Row> makeMetricManipulatorFn(final GroupByQuery query, final MetricManipulationFn fn)
|
||||
{
|
||||
return Functions.identity();
|
||||
return new Function<Row, Row>()
|
||||
{
|
||||
@Override
|
||||
public Row apply(Row input)
|
||||
{
|
||||
if (input instanceof MapBasedRow) {
|
||||
final MapBasedRow inputRow = (MapBasedRow) input;
|
||||
final Map<String, Object> values = Maps.newHashMap(((MapBasedRow) input).getEvent());
|
||||
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
|
||||
values.put(agg.getName(), fn.manipulate(agg, inputRow.getEvent().get(agg.getName())));
|
||||
}
|
||||
return new MapBasedRow(inputRow.getTimestamp(), values);
|
||||
}
|
||||
return input;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.group.limit;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.aggregation.post.PostAggregator;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DefaultLimitSpec implements LimitSpec
|
||||
{
|
||||
private final List<OrderByColumnSpec> columns;
|
||||
private final int limit;
|
||||
|
||||
@JsonCreator
|
||||
public DefaultLimitSpec(
|
||||
@JsonProperty("columns") List<OrderByColumnSpec> columns,
|
||||
@JsonProperty("limit") Integer limit
|
||||
)
|
||||
{
|
||||
this.columns = (columns == null) ? ImmutableList.<OrderByColumnSpec>of() : columns;
|
||||
this.limit = (limit == null) ? Integer.MAX_VALUE : limit;
|
||||
|
||||
Preconditions.checkState(limit > 0, "limit[%s] must be >0", limit);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<OrderByColumnSpec> getColumns()
|
||||
{
|
||||
return columns;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getLimit()
|
||||
{
|
||||
return limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Sequence<Row>, Sequence<Row>> build(
|
||||
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
|
||||
)
|
||||
{
|
||||
// Materialize the Comparator first for fast-fail error checking.
|
||||
final Comparator<Row> comparator = makeComparator(dimensions, aggs, postAggs);
|
||||
|
||||
return new Function<Sequence<Row>, Sequence<Row>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Row> apply(Sequence<Row> input)
|
||||
{
|
||||
return Sequences.limit(Sequences.sort(input, comparator), limit);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Comparator<Row> makeComparator(
|
||||
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
|
||||
)
|
||||
{
|
||||
Ordering<Row> ordering = new Ordering<Row>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Row left, Row right)
|
||||
{
|
||||
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
|
||||
}
|
||||
};
|
||||
|
||||
Map<String, Ordering<Row>> possibleOrderings = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
||||
for (DimensionSpec spec : dimensions) {
|
||||
final String dimension = spec.getOutputName();
|
||||
possibleOrderings.put(dimension, dimensionOrdering(dimension));
|
||||
}
|
||||
|
||||
for (final AggregatorFactory agg : aggs) {
|
||||
final String column = agg.getName();
|
||||
possibleOrderings.put(column, metricOrdering(column, agg.getComparator()));
|
||||
}
|
||||
|
||||
for (PostAggregator postAgg : postAggs) {
|
||||
final String column = postAgg.getName();
|
||||
possibleOrderings.put(column, metricOrdering(column, postAgg.getComparator()));
|
||||
}
|
||||
|
||||
for (OrderByColumnSpec columnSpec : columns) {
|
||||
Ordering<Row> nextOrdering = possibleOrderings.get(columnSpec.getDimension());
|
||||
|
||||
if (nextOrdering == null) {
|
||||
throw new ISE("Unknown column in order clause[%s]", columnSpec);
|
||||
}
|
||||
|
||||
switch (columnSpec.getDirection()) {
|
||||
case DESCENDING:
|
||||
nextOrdering = nextOrdering.reverse();
|
||||
}
|
||||
|
||||
ordering = ordering.compound(nextOrdering);
|
||||
}
|
||||
|
||||
return ordering;
|
||||
}
|
||||
|
||||
private Ordering<Row> metricOrdering(final String column, final Comparator comparator)
|
||||
{
|
||||
return new Ordering<Row>()
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public int compare(Row left, Row right)
|
||||
{
|
||||
return comparator.compare(left.getFloatMetric(column), right.getFloatMetric(column));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Ordering<Row> dimensionOrdering(final String dimension)
|
||||
{
|
||||
return Ordering.natural()
|
||||
.nullsFirst()
|
||||
.onResultOf(
|
||||
new Function<Row, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Row input)
|
||||
{
|
||||
// Multi-value dimensions have all been flattened at this point;
|
||||
final List<String> dimList = input.getDimension(dimension);
|
||||
return dimList.isEmpty() ? null : dimList.get(0);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "DefaultLimitSpec{" +
|
||||
"columns='" + columns + '\'' +
|
||||
", limit=" + limit +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.query.group.limit;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.base.Function;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.aggregation.post.PostAggregator;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopLimitSpec.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class)
|
||||
})
|
||||
public interface LimitSpec
|
||||
{
|
||||
public Function<Sequence<Row>, Sequence<Row>> build(List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs);
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package com.metamx.druid.query.group.limit;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.aggregation.post.PostAggregator;
|
||||
import com.metamx.druid.input.Row;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NoopLimitSpec implements LimitSpec
|
||||
{
|
||||
@Override
|
||||
public Function<Sequence<Row>, Sequence<Row>> build(
|
||||
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
|
||||
)
|
||||
{
|
||||
return Functions.identity();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package com.metamx.druid.query.group.limit;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class OrderByColumnSpec
|
||||
{
|
||||
/**
|
||||
* Maintain a map of the enum values so that we can just do a lookup and get a null if it doesn't exist instead
|
||||
* of an exception thrown.
|
||||
*/
|
||||
private static final Map<String, Direction> stupidEnumMap;
|
||||
|
||||
static
|
||||
{
|
||||
final ImmutableMap.Builder<String, Direction> bob = ImmutableMap.builder();
|
||||
for (Direction direction : Direction.values()) {
|
||||
bob.put(direction.toString(), direction);
|
||||
}
|
||||
stupidEnumMap = bob.build();
|
||||
}
|
||||
|
||||
private final String dimension;
|
||||
private final Direction direction;
|
||||
|
||||
@JsonCreator
|
||||
public static OrderByColumnSpec create(Object obj)
|
||||
{
|
||||
Preconditions.checkNotNull(obj, "Cannot build an OrderByColumnSpec from a null object.");
|
||||
|
||||
if (obj instanceof String) {
|
||||
return new OrderByColumnSpec(obj.toString(), null);
|
||||
}
|
||||
else if (obj instanceof Map) {
|
||||
final Map map = (Map) obj;
|
||||
|
||||
final String dimension = map.get("dimension").toString();
|
||||
final Direction direction = determineDirection(map.get("direction"));
|
||||
|
||||
return new OrderByColumnSpec(dimension, direction);
|
||||
}
|
||||
else {
|
||||
throw new ISE("Cannot build an OrderByColumnSpec from a %s", obj.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public OrderByColumnSpec(
|
||||
String dimension,
|
||||
Direction direction
|
||||
)
|
||||
{
|
||||
this.dimension = dimension;
|
||||
this.direction = direction == null ? Direction.ASCENDING : direction;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDimension()
|
||||
{
|
||||
return dimension;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Direction getDirection()
|
||||
{
|
||||
return direction;
|
||||
}
|
||||
|
||||
public static Direction determineDirection(Object directionObj)
|
||||
{
|
||||
if (directionObj == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String directionString = directionObj.toString();
|
||||
|
||||
Direction direction = stupidEnumMap.get(directionString);
|
||||
|
||||
if (direction == null) {
|
||||
final String lowerDimension = directionString.toLowerCase();
|
||||
|
||||
for (Direction dir : Direction.values()) {
|
||||
if (dir.toString().toLowerCase().startsWith(lowerDimension)) {
|
||||
if (direction != null) {
|
||||
throw new ISE("Ambiguous directions[%s] and [%s]", direction, dir);
|
||||
}
|
||||
direction = dir;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (direction == null) {
|
||||
throw new IAE("Unknown direction[%s]", directionString);
|
||||
}
|
||||
|
||||
return direction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "OrderByColumnSpec{" +
|
||||
"dimension='" + dimension + '\'' +
|
||||
", direction=" + direction +
|
||||
'}';
|
||||
}
|
||||
|
||||
public static enum Direction
|
||||
{
|
||||
ASCENDING,
|
||||
DESCENDING
|
||||
}
|
||||
}
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -42,7 +42,7 @@ public abstract class DbConnectorConfig
|
|||
@Config("druid.database.segmentTable")
|
||||
public abstract String getSegmentTable();
|
||||
|
||||
@JsonProperty("validationQuery")
|
||||
@JsonProperty("useValidationQuery")
|
||||
@Config("druid.database.validation")
|
||||
public boolean isValidationQuery() {
|
||||
return false;
|
||||
|
|
|
@ -52,9 +52,14 @@ fi
|
|||
JAVA_ARGS="-Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8"
|
||||
JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=${SPEC_FILE}"
|
||||
|
||||
|
||||
DRUID_CP=${EXAMPLE_LOC}
|
||||
DRUID_CP=${DRUID_CP}:../config
|
||||
#For a pull
|
||||
DRUID_CP=${DRUID_CP}:`ls ../target/druid-examples-*-selfcontained.jar`
|
||||
DRUID_CP=${DRUID_CP}:../config
|
||||
#For the kit
|
||||
DRUID_CP=${DRUID_CP}:`ls ./lib/druid-examples-*-selfcontained.jar`
|
||||
DRUID_CP=${DRUID_CP}:./config
|
||||
|
||||
echo "Running command:"
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -27,8 +27,8 @@ import com.google.common.collect.Iterators;
|
|||
import com.google.common.io.Closeables;
|
||||
import com.metamx.druid.indexer.data.StringInputRowParser;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
|
|
|
@ -7,8 +7,8 @@ import com.google.common.collect.Maps;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
|
|
|
@ -7,8 +7,8 @@ import com.google.common.collect.Lists;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import twitter4j.ConnectionLifeCycleListener;
|
||||
import twitter4j.HashtagEntity;
|
||||
import twitter4j.Status;
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package com.metamx.druid.indexer.data;
|
||||
|
||||
import com.metamx.druid.input.InputRow;
|
||||
|
||||
public interface InputRowParser<T>
|
||||
{
|
||||
public InputRow parse(T input);
|
||||
public void addDimensionExclusion(String dimension);
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
package com.metamx.druid.indexer.data;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
|
||||
{
|
||||
private final TimestampSpec timestampSpec;
|
||||
private final DataSpec dataSpec;
|
||||
private final Set<String> dimensionExclusions;
|
||||
|
||||
@JsonCreator
|
||||
public MapInputRowParser(
|
||||
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
|
||||
@JsonProperty("data") DataSpec dataSpec,
|
||||
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
|
||||
)
|
||||
{
|
||||
this.timestampSpec = timestampSpec;
|
||||
this.dataSpec = dataSpec;
|
||||
this.dimensionExclusions = Sets.newHashSet();
|
||||
if (dimensionExclusions != null) {
|
||||
for (String dimensionExclusion : dimensionExclusions) {
|
||||
this.dimensionExclusions.add(dimensionExclusion.toLowerCase());
|
||||
}
|
||||
}
|
||||
this.dimensionExclusions.add(timestampSpec.getTimestampColumn().toLowerCase());
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRow parse(Map<String, Object> theMap)
|
||||
{
|
||||
final List<String> dimensions = dataSpec.hasCustomDimensions()
|
||||
? dataSpec.getDimensions()
|
||||
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
|
||||
|
||||
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
|
||||
if (timestamp == null) {
|
||||
final String input = theMap.toString();
|
||||
throw new NullPointerException(
|
||||
String.format(
|
||||
"Null timestamp in input: %s",
|
||||
input.length() < 100 ? input : input.substring(0, 100) + "..."
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDimensionExclusion(String dimension)
|
||||
{
|
||||
dimensionExclusions.add(dimension);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public TimestampSpec getTimestampSpec()
|
||||
{
|
||||
return timestampSpec;
|
||||
}
|
||||
|
||||
@JsonProperty("data")
|
||||
public DataSpec getDataSpec()
|
||||
{
|
||||
return dataSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Set<String> getDimensionExclusions()
|
||||
{
|
||||
return dimensionExclusions;
|
||||
}
|
||||
}
|
|
@ -21,27 +21,20 @@ package com.metamx.druid.indexer.data;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import com.metamx.common.exception.FormattedException;
|
||||
import com.metamx.common.parsers.Parser;
|
||||
import com.metamx.common.parsers.ToLowerCaseParser;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class StringInputRowParser
|
||||
public class StringInputRowParser implements InputRowParser<String>
|
||||
{
|
||||
private final TimestampSpec timestampSpec;
|
||||
private final DataSpec dataSpec;
|
||||
|
||||
private final Set<String> dimensionExclusions;
|
||||
private final InputRowParser<Map<String, Object>> inputRowCreator;
|
||||
private final Parser<String, Object> parser;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -51,62 +44,24 @@ public class StringInputRowParser
|
|||
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
|
||||
)
|
||||
{
|
||||
this.timestampSpec = timestampSpec;
|
||||
this.dataSpec = dataSpec;
|
||||
|
||||
this.dimensionExclusions = Sets.newHashSet();
|
||||
if (dimensionExclusions != null) {
|
||||
for (String dimensionExclusion : dimensionExclusions) {
|
||||
this.dimensionExclusions.add(dimensionExclusion.toLowerCase());
|
||||
}
|
||||
}
|
||||
this.dimensionExclusions.add(timestampSpec.getTimestampColumn());
|
||||
|
||||
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
|
||||
this.parser = new ToLowerCaseParser(dataSpec.getParser());
|
||||
}
|
||||
|
||||
public StringInputRowParser addDimensionExclusion(String dimension)
|
||||
public void addDimensionExclusion(String dimension)
|
||||
{
|
||||
dimensionExclusions.add(dimension);
|
||||
return this;
|
||||
inputRowCreator.addDimensionExclusion(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRow parse(String input) throws FormattedException
|
||||
{
|
||||
Map<String, Object> theMap = parser.parse(input);
|
||||
|
||||
final List<String> dimensions = dataSpec.hasCustomDimensions()
|
||||
? dataSpec.getDimensions()
|
||||
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
|
||||
|
||||
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
|
||||
if (timestamp == null) {
|
||||
throw new NullPointerException(
|
||||
String.format(
|
||||
"Null timestamp in input string: %s",
|
||||
input.length() < 100 ? input : input.substring(0, 100) + "..."
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
|
||||
return inputRowCreator.parse(parser.parse(input));
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public TimestampSpec getTimestampSpec()
|
||||
@JsonValue
|
||||
public InputRowParser<Map<String, Object>> getInputRowCreator()
|
||||
{
|
||||
return timestampSpec;
|
||||
}
|
||||
|
||||
@JsonProperty("data")
|
||||
public DataSpec getDataSpec()
|
||||
{
|
||||
return dataSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Set<String> getDimensionExclusions()
|
||||
{
|
||||
return dimensionExclusions;
|
||||
return inputRowCreator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,22 +19,14 @@
|
|||
|
||||
package com.metamx.druid.merger.common.config;
|
||||
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class IndexerZkConfig
|
||||
public abstract class IndexerZkConfig extends ZkPathsConfig
|
||||
{
|
||||
@Config("druid.zk.paths.indexer.announcementsPath")
|
||||
public abstract String getAnnouncementPath();
|
||||
|
||||
@Config("druid.zk.paths.indexer.tasksPath")
|
||||
public abstract String getTaskPath();
|
||||
|
||||
@Config("druid.zk.paths.indexer.statusPath")
|
||||
public abstract String getStatusPath();
|
||||
|
||||
@Config("druid.zk.maxNumBytes")
|
||||
@Default("512000")
|
||||
public abstract long getMaxNumBytes();
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
package com.metamx.druid.merger.common.index;
|
||||
|
||||
/**
|
||||
* Objects that can be registered with a {@link ChatHandlerProvider} and provide http endpoints for indexing-related
|
||||
* objects. This interface is empty because it only exists to signal intent. The actual http endpoints are provided
|
||||
* through JAX-RS annotations on the {@link ChatHandler} objects.
|
||||
*/
|
||||
public interface ChatHandler
|
||||
{
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
package com.metamx.druid.merger.common.index;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method
|
||||
* allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded
|
||||
* {@link ServiceAnnouncer} will be used to advertise handlers on this host.
|
||||
*/
|
||||
public class ChatHandlerProvider
|
||||
{
|
||||
private static final Logger log = new Logger(ChatHandlerProvider.class);
|
||||
|
||||
private final ChatHandlerProviderConfig config;
|
||||
private final ServiceAnnouncer serviceAnnouncer;
|
||||
private final ConcurrentMap<String, ChatHandler> handlers;
|
||||
|
||||
public ChatHandlerProvider(
|
||||
ChatHandlerProviderConfig config,
|
||||
ServiceAnnouncer serviceAnnouncer
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.serviceAnnouncer = serviceAnnouncer;
|
||||
this.handlers = Maps.newConcurrentMap();
|
||||
}
|
||||
|
||||
public void register(final String key, ChatHandler handler)
|
||||
{
|
||||
final String service = serviceName(key);
|
||||
log.info("Registering Eventhandler: %s", key);
|
||||
|
||||
if (handlers.putIfAbsent(key, handler) != null) {
|
||||
throw new ISE("handler already registered for key: %s", key);
|
||||
}
|
||||
|
||||
try {
|
||||
serviceAnnouncer.announce(service);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to register service: %s", service);
|
||||
handlers.remove(key, handler);
|
||||
}
|
||||
}
|
||||
|
||||
public void unregister(final String key)
|
||||
{
|
||||
final String service = serviceName(key);
|
||||
|
||||
log.info("Unregistering chat handler: %s", key);
|
||||
|
||||
final ChatHandler handler = handlers.get(key);
|
||||
if (handler == null) {
|
||||
log.warn("handler not currently registered, ignoring: %s", key);
|
||||
}
|
||||
|
||||
try {
|
||||
serviceAnnouncer.unannounce(service);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to unregister service: %s", service);
|
||||
}
|
||||
|
||||
handlers.remove(key, handler);
|
||||
}
|
||||
|
||||
public Optional<ChatHandler> get(final String key)
|
||||
{
|
||||
return Optional.fromNullable(handlers.get(key));
|
||||
}
|
||||
|
||||
private String serviceName(String key)
|
||||
{
|
||||
return String.format(config.getServiceFormat(), key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.metamx.druid.merger.common.index;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
public interface EventReceiver
|
||||
{
|
||||
public void addAll(Collection<Map<String, Object>> events);
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
package com.metamx.druid.merger.common.index;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.indexer.data.MapInputRowParser;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
|
||||
* firehoses with an {@link ChatHandlerProvider}.
|
||||
*/
|
||||
@JsonTypeName("receiver")
|
||||
public class EventReceiverFirehoseFactory implements FirehoseFactory
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
|
||||
private static final int DEFAULT_BUFFER_SIZE = 100000;
|
||||
|
||||
private final String firehoseId;
|
||||
private final int bufferSize;
|
||||
private final MapInputRowParser parser;
|
||||
private final Optional<ChatHandlerProvider> chatHandlerProvider;
|
||||
|
||||
@JsonCreator
|
||||
public EventReceiverFirehoseFactory(
|
||||
@JsonProperty("firehoseId") String firehoseId,
|
||||
@JsonProperty("bufferSize") Integer bufferSize,
|
||||
@JsonProperty("parser") MapInputRowParser parser,
|
||||
@JacksonInject("chatHandlerProvider") ChatHandlerProvider chatHandlerProvider
|
||||
)
|
||||
{
|
||||
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId");
|
||||
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
|
||||
this.parser = Preconditions.checkNotNull(parser, "parser");
|
||||
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect() throws IOException
|
||||
{
|
||||
log.info("Connecting firehose: %s", firehoseId);
|
||||
|
||||
final EventReceiverFirehose firehose = new EventReceiverFirehose();
|
||||
|
||||
if (chatHandlerProvider.isPresent()) {
|
||||
chatHandlerProvider.get().register(firehoseId, firehose);
|
||||
}
|
||||
|
||||
return firehose;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFirehoseId()
|
||||
{
|
||||
return firehoseId;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getBufferSize()
|
||||
{
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public MapInputRowParser getParser()
|
||||
{
|
||||
return parser;
|
||||
}
|
||||
|
||||
public class EventReceiverFirehose implements ChatHandler, Firehose
|
||||
{
|
||||
private final BlockingQueue<InputRow> buffer;
|
||||
private final Object readLock = new Object();
|
||||
private volatile InputRow nextRow = null;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
public EventReceiverFirehose()
|
||||
{
|
||||
this.buffer = new ArrayBlockingQueue<InputRow>(bufferSize);
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/push-events")
|
||||
@Produces("application/json")
|
||||
public Response addAll(Collection<Map<String, Object>> events)
|
||||
{
|
||||
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId);
|
||||
|
||||
final List<InputRow> rows = Lists.newArrayList();
|
||||
for (final Map<String, Object> event : events) {
|
||||
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
|
||||
rows.add(parser.parse(event));
|
||||
}
|
||||
|
||||
try {
|
||||
for (final InputRow row : rows) {
|
||||
boolean added = false;
|
||||
while (!closed && !added) {
|
||||
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
if (!added) {
|
||||
throw new IllegalStateException("Cannot add events to closed firehose!");
|
||||
}
|
||||
}
|
||||
|
||||
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
synchronized (readLock) {
|
||||
try {
|
||||
while (!closed && nextRow == null) {
|
||||
nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return nextRow != null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRow nextRow()
|
||||
{
|
||||
synchronized (readLock) {
|
||||
final InputRow row = nextRow;
|
||||
|
||||
if (row == null) {
|
||||
throw new NoSuchElementException();
|
||||
} else {
|
||||
nextRow = null;
|
||||
return row;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
return new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
// Nothing
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
log.info("Firehose closing.");
|
||||
closed = true;
|
||||
|
||||
if (chatHandlerProvider.isPresent()) {
|
||||
chatHandlerProvider.get().unregister(firehoseId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,8 +31,8 @@ import com.google.common.collect.Lists;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.indexer.data.StringInputRowParser;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.LineIterator;
|
||||
|
||||
|
|
|
@ -35,8 +35,8 @@ import com.metamx.druid.input.InputRow;
|
|||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.Schema;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import com.metamx.druid.shard.ShardSpec;
|
||||
|
|
|
@ -37,8 +37,8 @@ import com.metamx.druid.merger.common.actions.LockListAction;
|
|||
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
|
||||
import com.metamx.druid.merger.common.index.YeOldePlumberSchool;
|
||||
import com.metamx.druid.realtime.FireDepartmentMetrics;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.plumber.Plumber;
|
||||
import com.metamx.druid.realtime.Schema;
|
||||
import com.metamx.druid.realtime.plumber.Sink;
|
||||
|
|
|
@ -32,7 +32,7 @@ import com.metamx.druid.merger.common.TaskStatus;
|
|||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClient;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.Schema;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
|
|
|
@ -44,12 +44,10 @@ import com.metamx.druid.query.QueryRunnerFactory;
|
|||
import com.metamx.druid.query.QueryToolChest;
|
||||
import com.metamx.druid.realtime.FireDepartmentConfig;
|
||||
import com.metamx.druid.realtime.FireDepartmentMetrics;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.GracefulShutdownFirehose;
|
||||
import com.metamx.druid.realtime.MinTimeFirehose;
|
||||
import com.metamx.druid.realtime.Schema;
|
||||
import com.metamx.druid.realtime.SegmentPublisher;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.GracefulShutdownFirehose;
|
||||
import com.metamx.druid.realtime.plumber.Plumber;
|
||||
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
|
||||
import com.metamx.druid.realtime.plumber.Sink;
|
||||
|
@ -81,9 +79,6 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
@JsonIgnore
|
||||
private final IndexGranularity segmentGranularity;
|
||||
|
||||
@JsonIgnore
|
||||
private final DateTime minTime;
|
||||
|
||||
@JsonIgnore
|
||||
private volatile Plumber plumber = null;
|
||||
|
||||
|
@ -106,8 +101,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
|
||||
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
|
||||
@JsonProperty("minTime") DateTime minTime
|
||||
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -128,7 +122,6 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
this.fireDepartmentConfig = fireDepartmentConfig;
|
||||
this.windowPeriod = windowPeriod;
|
||||
this.segmentGranularity = segmentGranularity;
|
||||
this.minTime = minTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -179,18 +172,12 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
||||
Firehose wrappedFirehose = firehoseFactory.connect();
|
||||
if (minTime != null) {
|
||||
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
|
||||
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
|
||||
segmentGranularity,
|
||||
windowPeriod
|
||||
);
|
||||
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod);
|
||||
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
|
||||
}
|
||||
|
||||
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
|
||||
|
@ -381,12 +368,6 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
return segmentGranularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DateTime getMinTime()
|
||||
{
|
||||
return minTime;
|
||||
}
|
||||
|
||||
public static class TaskActionSegmentPublisher implements SegmentPublisher
|
||||
{
|
||||
final Task task;
|
||||
|
|
|
@ -430,7 +430,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
|||
private void cleanup(final String workerId, final String taskId)
|
||||
{
|
||||
runningTasks.remove(taskId);
|
||||
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
|
||||
final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId);
|
||||
try {
|
||||
cf.delete().guaranteed().forPath(statusPath);
|
||||
}
|
||||
|
@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
|||
.withMode(CreateMode.EPHEMERAL)
|
||||
.forPath(
|
||||
JOINER.join(
|
||||
config.getTaskPath(),
|
||||
config.getIndexerTaskPath(),
|
||||
theWorker.getHost(),
|
||||
task.getId()
|
||||
),
|
||||
|
@ -522,7 +522,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
|||
private void addWorker(final Worker worker)
|
||||
{
|
||||
try {
|
||||
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
|
||||
final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
|
||||
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
|
||||
final ZkWorker zkWorker = new ZkWorker(
|
||||
worker,
|
||||
|
@ -626,18 +626,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
|||
try {
|
||||
Set<String> tasksToRetry = Sets.newHashSet(
|
||||
cf.getChildren()
|
||||
.forPath(JOINER.join(config.getTaskPath(), worker.getHost()))
|
||||
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
|
||||
);
|
||||
tasksToRetry.addAll(
|
||||
cf.getChildren()
|
||||
.forPath(JOINER.join(config.getStatusPath(), worker.getHost()))
|
||||
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
|
||||
);
|
||||
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
|
||||
|
||||
for (String taskId : tasksToRetry) {
|
||||
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), taskId);
|
||||
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId);
|
||||
if (cf.checkExists().forPath(taskPath) != null) {
|
||||
cf.delete().guaranteed().forPath(taskPath);
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
|
|||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClient;
|
||||
|
@ -68,6 +69,7 @@ public class TaskMasterLifecycle
|
|||
final TaskRunnerFactory runnerFactory,
|
||||
final ResourceManagementSchedulerFactory managementSchedulerFactory,
|
||||
final CuratorFramework curator,
|
||||
final ServiceAnnouncer serviceAnnouncer,
|
||||
final ServiceEmitter emitter
|
||||
)
|
||||
{
|
||||
|
@ -75,7 +77,7 @@ public class TaskMasterLifecycle
|
|||
this.taskActionClientFactory = taskActionClientFactory;
|
||||
|
||||
this.leaderSelector = new LeaderSelector(
|
||||
curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener()
|
||||
curator, indexerCoordinatorConfig.getIndexerLeaderLatchPath(), new LeaderSelectorListener()
|
||||
{
|
||||
@Override
|
||||
public void takeLeadership(CuratorFramework client) throws Exception
|
||||
|
@ -101,7 +103,7 @@ public class TaskMasterLifecycle
|
|||
final Lifecycle leaderLifecycle = new Lifecycle();
|
||||
leaderLifecycle.addManagedInstance(taskQueue);
|
||||
leaderLifecycle.addManagedInstance(taskRunner);
|
||||
Initialization.makeServiceDiscoveryClient(curator, serviceDiscoveryConfig, leaderLifecycle);
|
||||
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
|
||||
leaderLifecycle.addManagedInstance(taskConsumer);
|
||||
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
|
||||
|
||||
|
|
|
@ -109,11 +109,14 @@ public class TaskStorageQueryAdapter
|
|||
int nSuccesses = 0;
|
||||
int nFailures = 0;
|
||||
int nTotal = 0;
|
||||
int nPresent = 0;
|
||||
|
||||
for(final Optional<TaskStatus> statusOption : statuses.values()) {
|
||||
nTotal ++;
|
||||
|
||||
if(statusOption.isPresent()) {
|
||||
nPresent ++;
|
||||
|
||||
final TaskStatus status = statusOption.get();
|
||||
|
||||
if(status.isSuccess()) {
|
||||
|
@ -126,7 +129,7 @@ public class TaskStorageQueryAdapter
|
|||
|
||||
final Optional<TaskStatus> status;
|
||||
|
||||
if(nTotal == 0) {
|
||||
if(nPresent == 0) {
|
||||
status = Optional.absent();
|
||||
} else if(nSuccesses == nTotal) {
|
||||
status = Optional.of(TaskStatus.success(taskid));
|
||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.config;
|
|||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
import org.skife.config.DefaultNull;
|
||||
|
@ -29,16 +30,13 @@ import java.util.Set;
|
|||
|
||||
/**
|
||||
*/
|
||||
public abstract class IndexerCoordinatorConfig
|
||||
public abstract class IndexerCoordinatorConfig extends ZkPathsConfig
|
||||
{
|
||||
private volatile Set<String> whitelistDatasources = null;
|
||||
|
||||
@Config("druid.host")
|
||||
public abstract String getServerName();
|
||||
|
||||
@Config("druid.zk.paths.indexer.leaderLatchPath")
|
||||
public abstract String getLeaderLatchPath();
|
||||
|
||||
@Config("druid.merger.threads")
|
||||
@Default("1")
|
||||
public abstract int getNumLocalThreads();
|
||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
|
|||
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.base.Charsets;
|
||||
|
@ -45,6 +46,9 @@ import com.metamx.druid.QueryableNode;
|
|||
import com.metamx.druid.config.ConfigManager;
|
||||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.http.GuiceServletConfig;
|
||||
|
@ -62,6 +66,7 @@ import com.metamx.druid.merger.common.actions.TaskActionToolbox;
|
|||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskLogConfig;
|
||||
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
|
||||
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
|
||||
|
@ -109,6 +114,7 @@ import com.metamx.metrics.MonitorSchedulerConfig;
|
|||
import com.metamx.metrics.SysMonitor;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.security.AWSCredentials;
|
||||
|
@ -150,6 +156,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
private DBI dbi = null;
|
||||
private IndexerCoordinatorConfig config = null;
|
||||
private MergerDBCoordinator mergerDBCoordinator = null;
|
||||
private ServiceDiscovery serviceDiscovery = null;
|
||||
private ServiceAnnouncer serviceAnnouncer = null;
|
||||
private TaskStorage taskStorage = null;
|
||||
private TaskQueue taskQueue = null;
|
||||
private TaskLockbox taskLockbox = null;
|
||||
|
@ -253,6 +261,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
initializeIndexerCoordinatorConfig();
|
||||
initializeMergeDBCoordinator();
|
||||
initializeJacksonSubtypes();
|
||||
initializeJacksonInjections();
|
||||
initializeServiceDiscovery();
|
||||
initializeTaskStorage();
|
||||
initializeTaskLockbox();
|
||||
initializeTaskQueue();
|
||||
|
@ -362,6 +372,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
taskRunnerFactory,
|
||||
resourceManagementSchedulerFactory,
|
||||
getCuratorFramework(),
|
||||
serviceAnnouncer,
|
||||
emitter
|
||||
);
|
||||
getLifecycle().addManagedInstance(taskMasterLifecycle);
|
||||
|
@ -461,9 +472,21 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeJacksonInjections()
|
||||
{
|
||||
InjectableValues.Std injectables = new InjectableValues.Std();
|
||||
|
||||
injectables.addValue("s3Client", null)
|
||||
.addValue("segmentPusher", null)
|
||||
.addValue("chatHandlerProvider", null);
|
||||
|
||||
getJsonMapper().setInjectableValues(injectables);
|
||||
}
|
||||
|
||||
private void initializeJacksonSubtypes()
|
||||
{
|
||||
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
|
||||
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
|
||||
}
|
||||
|
||||
private void initializeHttpClient()
|
||||
|
@ -541,6 +564,20 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
}
|
||||
}
|
||||
|
||||
public void initializeServiceDiscovery() throws Exception
|
||||
{
|
||||
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
|
||||
if (serviceDiscovery == null) {
|
||||
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
|
||||
getCuratorFramework(), config, getLifecycle()
|
||||
);
|
||||
}
|
||||
if (serviceAnnouncer == null) {
|
||||
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
|
||||
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeTaskQueue()
|
||||
{
|
||||
if (taskQueue == null) {
|
||||
|
@ -609,7 +646,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
getJsonMapper(),
|
||||
getConfigFactory().build(RemoteTaskRunnerConfig.class),
|
||||
curator,
|
||||
new PathChildrenCache(curator, indexerZkConfig.getAnnouncementPath(), true),
|
||||
new PathChildrenCache(curator, indexerZkConfig.getIndexerAnnouncementPath(), true),
|
||||
retryScheduledExec,
|
||||
new RetryPolicyFactory(
|
||||
getConfigFactory().buildWithReplacements(
|
||||
|
|
|
@ -140,12 +140,7 @@ public class IndexerCoordinatorResource
|
|||
@Produces("application/json")
|
||||
public Response getTaskStatus(@PathParam("taskid") String taskid)
|
||||
{
|
||||
final Optional<TaskStatus> status = taskStorageQueryAdapter.getSameGroupMergedStatus(taskid);
|
||||
if (!status.isPresent()) {
|
||||
return Response.status(Response.Status.NOT_FOUND).build();
|
||||
} else {
|
||||
return Response.ok().entity(status.get()).build();
|
||||
}
|
||||
return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getSameGroupMergedStatus(taskid));
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -351,6 +346,17 @@ public class IndexerCoordinatorResource
|
|||
}
|
||||
}
|
||||
|
||||
public <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x) {
|
||||
final Map<String, Object> results = Maps.newHashMap();
|
||||
results.put("task", taskid);
|
||||
if (x.isPresent()) {
|
||||
results.put(objectType, x.get());
|
||||
return Response.status(Response.Status.OK).entity(results).build();
|
||||
} else {
|
||||
return Response.status(Response.Status.NOT_FOUND).entity(results).build();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
|
||||
{
|
||||
if (x.isPresent()) {
|
||||
|
|
|
@ -71,9 +71,9 @@ public class WorkerCuratorCoordinator
|
|||
this.worker = worker;
|
||||
this.config = config;
|
||||
|
||||
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getAnnouncementPath(), worker.getHost()));
|
||||
this.baseTaskPath = getPath(Arrays.asList(config.getTaskPath(), worker.getHost()));
|
||||
this.baseStatusPath = getPath(Arrays.asList(config.getStatusPath(), worker.getHost()));
|
||||
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getIndexerAnnouncementPath(), worker.getHost()));
|
||||
this.baseTaskPath = getPath(Arrays.asList(config.getIndexerTaskPath(), worker.getHost()));
|
||||
this.baseStatusPath = getPath(Arrays.asList(config.getIndexerStatusPath(), worker.getHost()));
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package com.metamx.druid.merger.worker.config;
|
||||
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.DefaultNull;
|
||||
|
||||
public abstract class ChatHandlerProviderConfig
|
||||
{
|
||||
@Config("druid.indexer.chathandler.service")
|
||||
@DefaultNull
|
||||
public abstract String getServiceFormat();
|
||||
|
||||
@Config("druid.host")
|
||||
public abstract String getHost();
|
||||
|
||||
@Config("druid.port")
|
||||
public abstract int getPort();
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.metamx.druid.merger.worker.executor;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.druid.merger.common.index.ChatHandler;
|
||||
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
|
||||
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
@Path("/mmx/worker/v1")
|
||||
public class ChatHandlerResource
|
||||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ChatHandlerProvider handlers;
|
||||
|
||||
@Inject
|
||||
public ChatHandlerResource(ObjectMapper jsonMapper, ChatHandlerProvider handlers)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.handlers = handlers;
|
||||
}
|
||||
|
||||
@Path("/chat/{id}")
|
||||
public Object doTaskChat(
|
||||
@PathParam("id") String handlerId
|
||||
)
|
||||
{
|
||||
final Optional<ChatHandler> handler = handlers.get(handlerId);
|
||||
|
||||
if (handler.isPresent()) {
|
||||
return handler.get();
|
||||
} else {
|
||||
return Response.status(Response.Status.NOT_FOUND).build();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -37,7 +37,7 @@ public class ExecutorMain
|
|||
{
|
||||
LogLevelAdjuster.register();
|
||||
|
||||
if (args.length != 3) {
|
||||
if (args.length != 2) {
|
||||
log.info("Usage: ExecutorMain <task.json> <status.json>");
|
||||
System.exit(2);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
|
@ -33,6 +36,11 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
|
||||
import com.metamx.druid.http.GuiceServletConfig;
|
||||
import com.metamx.druid.http.QueryServlet;
|
||||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
|
@ -48,8 +56,11 @@ import com.metamx.druid.merger.common.TaskToolboxFactory;
|
|||
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
|
||||
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskConfig;
|
||||
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
|
||||
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
|
||||
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
|
||||
import com.metamx.druid.merger.worker.config.WorkerConfig;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -70,6 +81,7 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
|||
import org.jets3t.service.security.AWSCredentials;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.DefaultServlet;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
|
@ -103,10 +115,12 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
private DataSegmentPusher segmentPusher = null;
|
||||
private TaskToolboxFactory taskToolboxFactory = null;
|
||||
private ServiceDiscovery serviceDiscovery = null;
|
||||
private ServiceAnnouncer serviceAnnouncer = null;
|
||||
private ServiceProvider coordinatorServiceProvider = null;
|
||||
private Server server = null;
|
||||
private ExecutorServiceTaskRunner taskRunner = null;
|
||||
private ExecutorLifecycle executorLifecycle = null;
|
||||
private ChatHandlerProvider chatHandlerProvider = null;
|
||||
|
||||
public ExecutorNode(
|
||||
String nodeType,
|
||||
|
@ -177,10 +191,10 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
initializeMonitors();
|
||||
initializeMergerConfig();
|
||||
initializeServiceDiscovery();
|
||||
initializeCoordinatorServiceProvider();
|
||||
initializeDataSegmentPusher();
|
||||
initializeTaskToolbox();
|
||||
initializeTaskRunner();
|
||||
initializeChatHandlerProvider();
|
||||
initializeJacksonInjections();
|
||||
initializeJacksonSubtypes();
|
||||
initializeServer();
|
||||
|
@ -195,9 +209,18 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper());
|
||||
lifecycle.addManagedInstance(executorLifecycle);
|
||||
|
||||
final Injector injector = Guice.createInjector(
|
||||
new ExecutorServletModule(
|
||||
getJsonMapper(),
|
||||
chatHandlerProvider
|
||||
)
|
||||
);
|
||||
final Context root = new Context(server, "/", Context.SESSIONS);
|
||||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||
root.addEventListener(new GuiceServletConfig(injector));
|
||||
root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0);
|
||||
root.addServlet(
|
||||
new ServletHolder(
|
||||
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())
|
||||
|
@ -265,7 +288,8 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
InjectableValues.Std injectables = new InjectableValues.Std();
|
||||
|
||||
injectables.addValue("s3Client", s3Service)
|
||||
.addValue("segmentPusher", segmentPusher);
|
||||
.addValue("segmentPusher", segmentPusher)
|
||||
.addValue("chatHandlerProvider", chatHandlerProvider);
|
||||
|
||||
getJsonMapper().setInjectableValues(injectables);
|
||||
}
|
||||
|
@ -273,6 +297,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
private void initializeJacksonSubtypes()
|
||||
{
|
||||
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
|
||||
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
|
||||
}
|
||||
|
||||
private void initializeHttpClient()
|
||||
|
@ -366,16 +391,16 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
|
||||
public void initializeServiceDiscovery() throws Exception
|
||||
{
|
||||
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
|
||||
if (serviceDiscovery == null) {
|
||||
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
|
||||
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
|
||||
getCuratorFramework(), config, lifecycle
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeCoordinatorServiceProvider()
|
||||
{
|
||||
if (serviceAnnouncer == null) {
|
||||
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
|
||||
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
|
||||
}
|
||||
if (coordinatorServiceProvider == null) {
|
||||
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
|
||||
workerConfig.getMasterService(),
|
||||
|
@ -401,6 +426,24 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
|
|||
}
|
||||
}
|
||||
|
||||
public void initializeChatHandlerProvider()
|
||||
{
|
||||
if (chatHandlerProvider == null) {
|
||||
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
|
||||
final ServiceAnnouncer myServiceAnnouncer;
|
||||
if (config.getServiceFormat() == null) {
|
||||
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
|
||||
myServiceAnnouncer = new NoopServiceAnnouncer();
|
||||
} else {
|
||||
myServiceAnnouncer = serviceAnnouncer;
|
||||
}
|
||||
this.chatHandlerProvider = new ChatHandlerProvider(
|
||||
config,
|
||||
myServiceAnnouncer
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private ObjectMapper jsonMapper = null;
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
package com.metamx.druid.merger.worker.executor;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
|
||||
import com.google.inject.Provides;
|
||||
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
|
||||
import com.sun.jersey.guice.JerseyServletModule;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
|
||||
import javax.inject.Singleton;
|
||||
|
||||
public class ExecutorServletModule extends JerseyServletModule
|
||||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ChatHandlerProvider receivers;
|
||||
|
||||
public ExecutorServletModule(
|
||||
ObjectMapper jsonMapper,
|
||||
ChatHandlerProvider receivers
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.receivers = receivers;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureServlets()
|
||||
{
|
||||
bind(ChatHandlerResource.class);
|
||||
bind(ObjectMapper.class).toInstance(jsonMapper);
|
||||
bind(ChatHandlerProvider.class).toInstance(receivers);
|
||||
|
||||
serve("/*").with(GuiceContainer.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
public JacksonJsonProvider getJacksonJsonProvider()
|
||||
{
|
||||
final JacksonJsonProvider provider = new JacksonJsonProvider();
|
||||
provider.setMapper(jsonMapper);
|
||||
return provider;
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package com.metamx.druid.merger.worker.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -33,6 +34,9 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.QueryableNode;
|
||||
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
|
||||
import com.metamx.druid.http.GuiceServletConfig;
|
||||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
|
@ -41,6 +45,7 @@ import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
|||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskLogConfig;
|
||||
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
|
||||
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
|
||||
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
|
||||
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
|
||||
|
@ -100,6 +105,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
|
|||
private ServiceEmitter emitter = null;
|
||||
private WorkerConfig workerConfig = null;
|
||||
private ServiceDiscovery serviceDiscovery = null;
|
||||
private ServiceAnnouncer serviceAnnouncer = null;
|
||||
private ServiceProvider coordinatorServiceProvider = null;
|
||||
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
|
||||
private WorkerTaskMonitor workerTaskMonitor = null;
|
||||
|
@ -175,7 +181,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
|
|||
initializeMonitors();
|
||||
initializeMergerConfig();
|
||||
initializeServiceDiscovery();
|
||||
initializeCoordinatorServiceProvider();
|
||||
initializeJacksonInjections();
|
||||
initializeJacksonSubtypes();
|
||||
initializeCuratorCoordinator();
|
||||
initializePersistentTaskLogs();
|
||||
|
@ -255,9 +261,21 @@ public class WorkerNode extends QueryableNode<WorkerNode>
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeJacksonInjections()
|
||||
{
|
||||
InjectableValues.Std injectables = new InjectableValues.Std();
|
||||
|
||||
injectables.addValue("s3Client", null)
|
||||
.addValue("segmentPusher", null)
|
||||
.addValue("chatHandlerProvider", null);
|
||||
|
||||
getJsonMapper().setInjectableValues(injectables);
|
||||
}
|
||||
|
||||
private void initializeJacksonSubtypes()
|
||||
{
|
||||
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
|
||||
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
|
||||
}
|
||||
|
||||
private void initializeHttpClient()
|
||||
|
@ -321,10 +339,6 @@ public class WorkerNode extends QueryableNode<WorkerNode>
|
|||
getLifecycle()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeCoordinatorServiceProvider()
|
||||
{
|
||||
if (coordinatorServiceProvider == null) {
|
||||
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
|
||||
workerConfig.getMasterService(),
|
||||
|
|
|
@ -205,8 +205,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
new Period("PT10M"),
|
||||
IndexGranularity.HOUR,
|
||||
null
|
||||
IndexGranularity.HOUR
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
|
|
@ -252,23 +252,29 @@ public class RemoteTaskRunnerTest
|
|||
new IndexerZkConfig()
|
||||
{
|
||||
@Override
|
||||
public String getAnnouncementPath()
|
||||
public String getIndexerAnnouncementPath()
|
||||
{
|
||||
return announcementsPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskPath()
|
||||
public String getIndexerTaskPath()
|
||||
{
|
||||
return tasksPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatusPath()
|
||||
public String getIndexerStatusPath()
|
||||
{
|
||||
return statusPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getZkBasePath()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxNumBytes()
|
||||
{
|
||||
|
@ -375,23 +381,29 @@ public class RemoteTaskRunnerTest
|
|||
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
|
||||
{
|
||||
@Override
|
||||
public String getAnnouncementPath()
|
||||
public String getIndexerAnnouncementPath()
|
||||
{
|
||||
return announcementsPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskPath()
|
||||
public String getIndexerTaskPath()
|
||||
{
|
||||
return tasksPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatusPath()
|
||||
public String getIndexerStatusPath()
|
||||
{
|
||||
return statusPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getZkBasePath()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getTaskAssignmentTimeoutDuration()
|
||||
{
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package com.metamx.druid.merger.coordinator;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -58,8 +59,8 @@ import com.metamx.druid.merger.common.task.IndexTask;
|
|||
import com.metamx.druid.merger.common.task.KillTask;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
|
||||
import com.metamx.druid.realtime.Firehose;
|
||||
import com.metamx.druid.realtime.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Event;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
|
@ -206,6 +207,9 @@ public class TaskLifecycleTest
|
|||
-1
|
||||
);
|
||||
|
||||
final Optional<TaskStatus> preRunTaskStatus = tsqa.getSameGroupMergedStatus(indexTask.getId());
|
||||
Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent());
|
||||
|
||||
final TaskStatus mergedStatus = runTask(indexTask);
|
||||
final TaskStatus status = ts.getStatus(indexTask.getId()).get();
|
||||
final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished());
|
||||
|
|
13
pom.xml
13
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
@ -38,7 +38,7 @@
|
|||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.22.0</metamx.java-util.version>
|
||||
<metamx.java-util.version>0.22.3</metamx.java-util.version>
|
||||
<netflix.curator.version>2.0.1-21-22</netflix.curator.version>
|
||||
</properties>
|
||||
|
||||
|
@ -51,7 +51,7 @@
|
|||
<module>merger</module>
|
||||
<module>realtime</module>
|
||||
<module>examples</module>
|
||||
<module>druid-services</module>
|
||||
<module>services</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -60,7 +60,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>emitter</artifactId>
|
||||
<version>0.2.0</version>
|
||||
<version>0.2.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
|
@ -414,6 +414,11 @@
|
|||
<artifactId>antlr4-maven-plugin</artifactId>
|
||||
<version>4.0</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>2.4</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -21,6 +21,8 @@ package com.metamx.druid.realtime;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.druid.realtime.plumber.Plumber;
|
||||
import com.metamx.druid.realtime.plumber.PlumberSchool;
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
|||
import com.metamx.druid.query.QueryToolChest;
|
||||
import com.metamx.druid.query.segment.QuerySegmentWalker;
|
||||
import com.metamx.druid.query.segment.SegmentDescriptor;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.plumber.Plumber;
|
||||
import com.metamx.druid.realtime.plumber.Sink;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Creates firehoses clipped to a particular time interval. Useful for enforcing min time, max time, and time windows.
|
||||
*/
|
||||
public class ClippedFirehoseFactory implements FirehoseFactory
|
||||
{
|
||||
private final FirehoseFactory delegate;
|
||||
private final Interval interval;
|
||||
|
||||
@JsonCreator
|
||||
public ClippedFirehoseFactory(
|
||||
@JsonProperty("delegate") FirehoseFactory delegate,
|
||||
@JsonProperty("interval") Interval interval
|
||||
)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public FirehoseFactory getDelegate()
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect() throws IOException
|
||||
{
|
||||
return new PredicateFirehose(
|
||||
delegate.connect(),
|
||||
new Predicate<InputRow>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(InputRow input)
|
||||
{
|
||||
return interval.contains(input.getTimestampFromEpoch());
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.realtime;
|
||||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.metamx.druid.input.InputRow;
|
||||
|
||||
|
@ -28,7 +28,7 @@ import java.io.Closeable;
|
|||
* abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
|
||||
* one of these and register it with the RealtimeMain.
|
||||
*
|
||||
* This object acts a lot like an Iterator, but it doesn't not extend the Iterator interface because it extends
|
||||
* This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
|
||||
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
|
||||
* gets passed around as an Iterator.
|
||||
* <p>
|
|
@ -17,25 +17,25 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.realtime;
|
||||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class)
|
||||
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
|
||||
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
|
||||
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
|
||||
})
|
||||
public interface FirehoseFactory
|
||||
{
|
||||
/**
|
||||
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
|
||||
* call hasMore() on the returned Firehose (which might subsequently block).
|
||||
*
|
||||
* <p/>
|
||||
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
|
||||
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
|
||||
* invalid configuration is preferred over returning null.
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.realtime;
|
||||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.realtime;
|
||||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
@ -1,26 +1,25 @@
|
|||
package com.metamx.druid.realtime;
|
||||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* Provides a view on a firehose that only returns rows at or after a certain minimum timestamp.
|
||||
* Provides a view on a firehose that only returns rows that match a certain predicate.
|
||||
* Not thread-safe.
|
||||
*/
|
||||
public class MinTimeFirehose implements Firehose
|
||||
public class PredicateFirehose implements Firehose
|
||||
{
|
||||
private final Firehose firehose;
|
||||
private final DateTime minTime;
|
||||
private final Predicate<InputRow> predicate;
|
||||
|
||||
private InputRow savedInputRow = null;
|
||||
|
||||
public MinTimeFirehose(Firehose firehose, DateTime minTime)
|
||||
public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate)
|
||||
{
|
||||
this.firehose = firehose;
|
||||
this.minTime = minTime;
|
||||
this.predicate = predicate;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -32,7 +31,7 @@ public class MinTimeFirehose implements Firehose
|
|||
|
||||
while (firehose.hasMore()) {
|
||||
final InputRow row = firehose.nextRow();
|
||||
if (acceptable(row)) {
|
||||
if (predicate.apply(row)) {
|
||||
savedInputRow = row;
|
||||
return true;
|
||||
}
|
||||
|
@ -60,9 +59,4 @@ public class MinTimeFirehose implements Firehose
|
|||
{
|
||||
firehose.close();
|
||||
}
|
||||
|
||||
private boolean acceptable(InputRow row)
|
||||
{
|
||||
return row.getTimestampFromEpoch() >= minTime.getMillis();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package com.metamx.druid.realtime.firehose;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Creates firehoses that shut off at a particular time. Useful for limiting the lifespan of a realtime job.
|
||||
*/
|
||||
public class TimedShutoffFirehoseFactory implements FirehoseFactory
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(FirehoseFactory.class);
|
||||
private final FirehoseFactory delegateFactory;
|
||||
private final DateTime shutoffTime;
|
||||
|
||||
@JsonCreator
|
||||
public TimedShutoffFirehoseFactory(
|
||||
@JsonProperty("delegate") FirehoseFactory delegateFactory,
|
||||
@JsonProperty("shutoffTime") DateTime shutoffTime
|
||||
)
|
||||
{
|
||||
this.delegateFactory = delegateFactory;
|
||||
this.shutoffTime = shutoffTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect() throws IOException
|
||||
{
|
||||
return new TimedShutoffFirehose();
|
||||
}
|
||||
|
||||
public class TimedShutoffFirehose implements Firehose
|
||||
{
|
||||
private final Firehose firehose;
|
||||
private final ScheduledExecutorService exec;
|
||||
private final Object shutdownLock = new Object();
|
||||
private volatile boolean shutdown = false;
|
||||
|
||||
public TimedShutoffFirehose() throws IOException
|
||||
{
|
||||
firehose = delegateFactory.connect();
|
||||
|
||||
exec = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder().setDaemon(true)
|
||||
.setNameFormat("timed-shutoff-firehose-%d")
|
||||
.build()
|
||||
);
|
||||
|
||||
exec.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
log.info("Closing delegate firehose.");
|
||||
|
||||
shutdown = true;
|
||||
try {
|
||||
firehose.close();
|
||||
} catch (IOException e) {
|
||||
log.warn(e, "Failed to close delegate firehose, ignoring.");
|
||||
}
|
||||
}
|
||||
},
|
||||
shutoffTime.getMillis() - System.currentTimeMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
|
||||
log.info("Firehose created, will shut down at: %s", shutoffTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
return firehose.hasMore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputRow nextRow()
|
||||
{
|
||||
return firehose.nextRow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
return firehose.commit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
synchronized (shutdownLock) {
|
||||
if (!shutdown) {
|
||||
shutdown = true;
|
||||
firehose.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@JsonProperty("delegate")
|
||||
public FirehoseFactory getDelegateFactory()
|
||||
{
|
||||
return delegateFactory;
|
||||
}
|
||||
|
||||
@JsonProperty("shutoffTime")
|
||||
public DateTime getShutoffTime()
|
||||
{
|
||||
return shutoffTime;
|
||||
}
|
||||
}
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -39,6 +39,7 @@ import com.metamx.druid.concurrent.Execs;
|
|||
import com.metamx.druid.config.ConfigManager;
|
||||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseRuleManagerConfig;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
|
@ -174,6 +175,10 @@ public class MasterMain
|
|||
serviceDiscoveryConfig,
|
||||
lifecycle
|
||||
);
|
||||
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
|
||||
serviceDiscoveryConfig, serviceDiscovery
|
||||
);
|
||||
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
|
||||
|
||||
IndexingServiceClient indexingServiceClient = null;
|
||||
if (druidMasterConfig.getMergerServiceName() != null) {
|
||||
|
|
|
@ -22,7 +22,6 @@ package com.metamx.druid.query.group;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -41,6 +40,7 @@ import com.metamx.druid.query.QueryRunner;
|
|||
import com.metamx.druid.query.QueryRunnerTestHelper;
|
||||
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
|
||||
import com.metamx.druid.query.dimension.DimensionSpec;
|
||||
import com.metamx.druid.query.group.limit.OrderByColumnSpec;
|
||||
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
@ -126,7 +126,7 @@ public class GroupByQueryRunnerTest
|
|||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.<Row>asList(
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
|
||||
|
@ -187,25 +187,25 @@ public class GroupByQueryRunnerTest
|
|||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "automotive", "rows", 1L, "idx", 135L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "business", "rows", 1L, "idx", 118L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "entertainment", "rows", 1L, "idx", 158L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "health", "rows", 1L, "idx", 120L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "mezzanine", "rows", 3L, "idx", 2870L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "news", "rows", 1L, "idx", 121L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "premium", "rows", 3L, "idx", 2900L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "technology", "rows", 1L, "idx", 78L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.<String, Object>of("alias", "travel", "rows", 1L, "idx", 119L)),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "business", "rows", 1L, "idx", 118L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "entertainment", "rows", 1L, "idx", 158L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "health", "rows", 1L, "idx", 120L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "mezzanine", "rows", 3L, "idx", 2870L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "news", "rows", 1L, "idx", 121L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "premium", "rows", 3L, "idx", 2900L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "technology", "rows", 1L, "idx", 78L),
|
||||
createExpectedRow(new DateTime("2011-03-31", tz), "alias", "travel", "rows", 1L, "idx", 119L),
|
||||
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "automotive", "rows", 1L, "idx", 147L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "business", "rows", 1L, "idx", 112L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "entertainment", "rows", 1L, "idx", 166L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "health", "rows", 1L, "idx", 113L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "mezzanine", "rows", 3L, "idx", 2447L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "news", "rows", 1L, "idx", 114L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "premium", "rows", 3L, "idx", 2505L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "technology", "rows", 1L, "idx", 97L)),
|
||||
(Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.<String, Object>of("alias", "travel", "rows", 1L, "idx", 126L))
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "automotive", "rows", 1L, "idx", 147L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "business", "rows", 1L, "idx", 112L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "entertainment", "rows", 1L, "idx", 166L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "health", "rows", 1L, "idx", 113L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "mezzanine", "rows", 3L, "idx", 2447L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "news", "rows", 1L, "idx", 114L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "premium", "rows", 3L, "idx", 2505L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "technology", "rows", 1L, "idx", 97L),
|
||||
createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L)
|
||||
);
|
||||
|
||||
Iterable<Row> results = Sequences.toList(
|
||||
|
@ -252,7 +252,7 @@ public class GroupByQueryRunnerTest
|
|||
}
|
||||
);
|
||||
|
||||
List<Row> expectedResults = Arrays.<Row>asList(
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||
|
@ -267,7 +267,7 @@ public class GroupByQueryRunnerTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||
|
||||
List<Row> allGranExpectedResults = Arrays.<Row>asList(
|
||||
List<Row> allGranExpectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||
createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||
|
@ -284,7 +284,92 @@ public class GroupByQueryRunnerTest
|
|||
|
||||
}
|
||||
|
||||
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
|
||||
@Test
|
||||
public void testGroupByOrderLimit() throws Exception
|
||||
{
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-04-02/2011-04-04")
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.addOrderByColumn("rows")
|
||||
.addOrderByColumn("alias", OrderByColumnSpec.Direction.DESCENDING)
|
||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
|
||||
|
||||
final GroupByQuery query = builder.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
|
||||
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L)
|
||||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithOrderLimit2() throws Exception
|
||||
{
|
||||
GroupByQuery.Builder builder = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setInterval("2011-04-02/2011-04-04")
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.addOrderByColumn("rows", "desc")
|
||||
.addOrderByColumn("alias", "d")
|
||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
|
||||
|
||||
final GroupByQuery query = builder.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
|
||||
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L)
|
||||
);
|
||||
|
||||
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
|
||||
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
|
||||
TestHelper.assertExpectedObjects(
|
||||
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
private Row createExpectedRow(final String timestamp, Object... vals)
|
||||
{
|
||||
return createExpectedRow(new DateTime(timestamp), vals);
|
||||
}
|
||||
|
||||
private Row createExpectedRow(final DateTime timestamp, Object... vals)
|
||||
{
|
||||
Preconditions.checkArgument(vals.length % 2 == 0);
|
||||
|
||||
|
@ -293,6 +378,6 @@ public class GroupByQueryRunnerTest
|
|||
theVals.put(vals[i].toString(), vals[i+1]);
|
||||
}
|
||||
|
||||
return new MapBasedRow(new DateTime(timestamp), theVals);
|
||||
return new MapBasedRow(timestamp, theVals);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import com.metamx.druid.query.timeseries.TimeseriesQuery;
|
|||
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerTest;
|
||||
import com.metamx.druid.result.Result;
|
||||
import com.metamx.druid.result.TimeseriesResultValue;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
|
@ -48,6 +47,7 @@ import java.util.Collection;
|
|||
@RunWith(Parameterized.class)
|
||||
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
@Parameterized.Parameters
|
||||
public static Collection<?> constructorFeeder() throws IOException
|
||||
{
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
<artifactId>druid-services</artifactId>
|
||||
<name>druid-services</name>
|
||||
<description>druid-services</description>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.1-SNAPSHOT</version>
|
||||
<version>0.4.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -42,7 +42,6 @@
|
|||
<artifactId>druid-server</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
@ -64,6 +63,23 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>distro-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<descriptors>
|
||||
<descriptor>src/assembly/assembly.xml</descriptor>
|
||||
</descriptors>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,60 @@
|
|||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
|
||||
<id>bin</id>
|
||||
<formats>
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>../examples/config</directory>
|
||||
<includes>
|
||||
<include>*</include>
|
||||
</includes>
|
||||
<outputDirectory>config</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/bin</directory>
|
||||
<includes>
|
||||
<include>*sh</include>
|
||||
</includes>
|
||||
<fileMode>744</fileMode>
|
||||
<outputDirectory>/</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/target</directory>
|
||||
<includes>
|
||||
<include>druid-examples-*-selfcontained.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/bin/examples</directory>
|
||||
<includes>
|
||||
<include>**</include>
|
||||
</includes>
|
||||
<outputDirectory>examples</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/bin/examples/twitter</directory>
|
||||
<includes>
|
||||
<include>*sh</include>
|
||||
</includes>
|
||||
<fileMode>744</fileMode>
|
||||
<outputDirectory>examples/twitter</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../</directory>
|
||||
<includes>
|
||||
<include>LICENSE</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
</fileSets>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<useProjectArtifact>true</useProjectArtifact>
|
||||
<useTransitiveDependencies>true</useTransitiveDependencies>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
|
@ -0,0 +1 @@
|
|||
echo "Hello World!"
|
Loading…
Reference in New Issue