1) Broker works with Guice

2) Extract ServerViewModule to handle various ServerView thingies
3) Extract QueryToolChestModule and QueryRunnerFactoryModule to reuse code for configuration of Query stuff
4) Extract QueryJettyServerInitializer to reuse between Historical and Broker nodes
5) Remove ClientMain, BrokerMain and BrokerNode
This commit is contained in:
cheddar 2013-08-06 12:01:52 -07:00
parent 293c0e8418
commit 9f71d42314
37 changed files with 508 additions and 658 deletions

View File

@ -108,6 +108,14 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>

View File

@ -22,10 +22,13 @@ package com.metamx.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.selector.QueryableDruidServer;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.annotations.Client;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChestWarehouse;
@ -54,12 +57,12 @@ public class BrokerServerView implements TimelineServerView
private final HttpClient httpClient;
private final ServerView baseView;
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehose,
ObjectMapper smileMapper,
HttpClient httpClient,
ServerView baseView,
ExecutorService exec
@Client HttpClient httpClient,
ServerView baseView
)
{
this.warehose = warehose;
@ -71,6 +74,7 @@ public class BrokerServerView implements TimelineServerView
this.selectors = Maps.newHashMap();
this.timelines = Maps.newHashMap();
ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s");
baseView.registerSegmentCallback(
exec,
new ServerView.SegmentCallback()

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.LazySequence;
@ -79,6 +80,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
private final Cache cache;
private final ObjectMapper objectMapper;
@Inject
public CachingClusteredClient(
QueryToolChestWarehouse warehouse,
TimelineServerView serverView,

View File

@ -1,11 +0,0 @@
package com.metamx.druid.client.cache;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class CacheConfig
{
@Config("druid.bard.cache.type")
@Default("local")
public abstract String getType();
}

View File

@ -0,0 +1,14 @@
package com.metamx.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalCacheProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "local", value = LocalCacheProvider.class),
@JsonSubTypes.Type(name = "memcached", value = MemcachedCacheProvider.class)
})
public interface CacheProvider extends Provider<Cache>
{
}

View File

@ -19,26 +19,30 @@
package com.metamx.druid.client.cache;
import org.skife.config.Config;
import org.skife.config.Default;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
/**
*/
public abstract class MapCacheConfig
public class LocalCacheProvider implements CacheProvider
{
@Config("${prefix}.sizeInBytes")
@Default("0")
public abstract long getSizeInBytes();
@JsonProperty
@Min(0)
private long sizeInBytes = 0;
@Config("${prefix}.initialSize")
public int getInitialSize()
{
return 500000;
}
@JsonProperty
@Min(0)
private int initialSize = 500000;
@Config("${prefix}.logEvictionCount")
public int getLogEvictionCount()
@JsonProperty
@Min(0)
private int logEvictionCount = 0;
@Override
public Cache get()
{
return 0;
return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes));
}
}

View File

@ -44,17 +44,6 @@ public class MapCache implements Cache
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
public static com.metamx.druid.client.cache.Cache create(final MapCacheConfig config)
{
return new MapCache(
new ByteCountingLRUMap(
config.getInitialSize(),
config.getLogEvictionCount(),
config.getSizeInBytes()
)
);
}
MapCache(
ByteCountingLRUMap byteCountingLRUMap
)

View File

@ -49,7 +49,7 @@ public class MemcachedCache implements Cache
{
private static final Logger log = new Logger(MemcachedCache.class);
public static MemcachedCache create(final MemcachedCacheConfig config)
public static MemcachedCache create(final MemcachedCacheConfiger config)
{
try {
LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize());

View File

@ -1,26 +0,0 @@
package com.metamx.druid.client.cache;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class MemcachedCacheConfig
{
@Config("${prefix}.expiration")
@Default("2592000")
public abstract int getExpiration();
@Config("${prefix}.timeout")
@Default("500")
public abstract int getTimeout();
@Config("${prefix}.hosts")
public abstract String getHosts();
@Config("${prefix}.maxObjectSize")
@Default("52428800")
public abstract int getMaxObjectSize();
@Config("${prefix}.memcachedPrefix")
@Default("druid")
public abstract String getMemcachedPrefix();
}

View File

@ -0,0 +1,49 @@
package com.metamx.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class MemcachedCacheConfiger
{
@JsonProperty
private int expiration = 2592000; // What is this number?
@JsonProperty
private int timeout = 500;
@JsonProperty
@NotNull
private String hosts;
@JsonProperty
private int maxObjectSize = 50 * 1024 * 1024;
@JsonProperty
private String memcachedPrefix = "druid";
public int getExpiration()
{
return expiration;
}
public int getTimeout()
{
return timeout;
}
public String getHosts()
{
return hosts;
}
public int getMaxObjectSize()
{
return maxObjectSize;
}
public String getMemcachedPrefix()
{
return memcachedPrefix;
}
}

View File

@ -0,0 +1,10 @@
package com.metamx.druid.client.cache;
public class MemcachedCacheProvider extends MemcachedCacheConfiger implements CacheProvider
{
@Override
public Cache get()
{
return MemcachedCache.create(this);
}
}

View File

@ -0,0 +1,28 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.TimelineServerView;
import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.cache.CacheProvider;
import com.metamx.druid.query.MapQueryToolChestWarehouse;
import com.metamx.druid.query.QueryToolChestWarehouse;
/**
*/
public class BrokerModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
}
}

View File

@ -0,0 +1,22 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewProvider;
import com.metamx.druid.client.ServerView;
/**
*/
public class ServerViewModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
binder.bind(InventoryView.class).to(ServerInventoryView.class);
binder.bind(ServerView.class).to(ServerInventoryView.class);
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
}
}

View File

@ -0,0 +1,17 @@
package com.metamx.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Client
{
}

View File

@ -1,53 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
/**
*/
public class BrokerMain
{
private static final Logger log = new Logger(BrokerMain.class);
public static void main(String[] args) throws Exception
{
LogLevelAdjuster.register();
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(
BrokerNode.builder().build()
);
try {
lifecycle.start();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
System.exit(2);
}
lifecycle.join();
}
}

View File

@ -1,355 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.cache.CacheConfig;
import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.QueryToolChestWarehouse;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
import com.metamx.druid.utils.PropUtils;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.Monitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*/
public class BrokerNode extends QueryableNode<BrokerNode>
{
private static final Logger log = new Logger(BrokerNode.class);
public static final String CACHE_TYPE_LOCAL = "local";
public static final String CACHE_TYPE_MEMCACHED = "memcached";
public static final String CACHE_PROPERTY_PREFIX = "druid.bard.cache";
private final List<Module> extraModules = Lists.newArrayList();
private final List<String> pathsForGuiceFilter = Lists.newArrayList();
private QueryToolChestWarehouse warehouse = null;
private HttpClient brokerHttpClient = null;
private Cache cache = null;
private boolean useDiscovery = true;
public static Builder builder()
{
return new Builder();
}
public BrokerNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super("broker", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryToolChestWarehouse getWarehouse()
{
initializeWarehouse();
return warehouse;
}
public BrokerNode setWarehouse(QueryToolChestWarehouse warehouse)
{
checkFieldNotSetAndSet("warehouse", warehouse);
return this;
}
public HttpClient getBrokerHttpClient()
{
initializeBrokerHttpClient();
return brokerHttpClient;
}
public BrokerNode setBrokerHttpClient(HttpClient brokerHttpClient)
{
checkFieldNotSetAndSet("brokerHttpClient", brokerHttpClient);
return this;
}
public Cache getCache()
{
initializeCacheBroker();
return cache;
}
public BrokerNode setCache(Cache cache)
{
checkFieldNotSetAndSet("cache", cache);
return this;
}
public BrokerNode useDiscovery(boolean useDiscovery)
{
this.useDiscovery = useDiscovery;
return this;
}
/**
* This method allows you to specify more Guice modules to use primarily for injected extra Jersey resources.
* I'd like to remove the Guice dependency for this, but I don't know how to set up Jersey without Guice...
*
* This is deprecated because at some point in the future, we will eliminate the Guice dependency and anything
* that uses this will break. Use at your own risk.
*
* @param module the module to register with Guice
*
* @return this
*/
@Deprecated
public BrokerNode addModule(Module module)
{
extraModules.add(module);
return this;
}
/**
* This method is used to specify extra paths that the GuiceFilter should pay attention to.
*
* This is deprecated for the same reason that addModule is deprecated.
*
* @param path the path that the GuiceFilter should pay attention to.
*
* @return this
*/
@Deprecated
public BrokerNode addPathForGuiceFilter(String path)
{
pathsForGuiceFilter.add(path);
return this;
}
@Override
protected void doInit() throws Exception
{
initializeWarehouse();
initializeBrokerHttpClient();
initializeCacheBroker();
initializeDiscovery();
final Lifecycle lifecycle = getLifecycle();
final List<Monitor> monitors = getMonitors();
monitors.add(new CacheMonitor(cache));
startMonitoring(monitors);
final ExecutorService viewExec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("BrokerServerView-%s").build()
);
final BrokerServerView view = new BrokerServerView(
warehouse, getSmileMapper(), brokerHttpClient, getServerView(), viewExec
);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper());
lifecycle.addManagedInstance(baseClient);
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, getEmitter(), baseClient);
List<Module> theModules = Lists.newArrayList();
theModules.add(new ClientServletModule(texasRanger, getInventoryView(), getJsonMapper()));
theModules.addAll(extraModules);
final Injector injector = Guice.createInjector(theModules);
final ServletContextHandler root = new ServletContextHandler();
root.setContextPath("/");
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())
),
"/druid/v2/*"
);
root.addFilter(GzipFilter.class, "/*", null);
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null);
for (String path : pathsForGuiceFilter) {
root.addFilter(GuiceFilter.class, path, null);
}
}
private void initializeDiscovery() throws Exception
{
if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle();
final CuratorDiscoveryConfig curatorDiscoveryConfig = getConfigFactory().build(CuratorDiscoveryConfig.class);
final DruidNode nodeConfig = getConfigFactory().build(DruidNode.class);
final CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
getConfigFactory().build(CuratorConfig.class), lifecycle
);
final ServiceDiscovery<Void> serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, curatorDiscoveryConfig, lifecycle
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(serviceDiscovery);
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
}
}
private void initializeCacheBroker()
{
if (cache == null) {
String cacheType = getConfigFactory()
.build(CacheConfig.class)
.getType();
if (cacheType.equals(CACHE_TYPE_LOCAL)) {
setCache(
MapCache.create(
getConfigFactory().buildWithReplacements(
MapCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
)
)
);
} else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) {
setCache(
MemcachedCache.create(
getConfigFactory().buildWithReplacements(
MemcachedCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
)
)
);
} else {
throw new ISE("Unknown cache type [%s]", cacheType);
}
}
}
private void initializeBrokerHttpClient()
{
if (brokerHttpClient == null) {
setBrokerHttpClient(
HttpClientInit.createClient(
HttpClientConfig
.builder()
.withNumConnections(PropUtils.getPropertyAsInt(getProps(), "druid.client.http.connections"))
.build(),
getLifecycle()
)
);
}
}
private void initializeWarehouse()
{
if (warehouse == null) {
setWarehouse(new ReflectionQueryToolChestWarehouse());
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public BrokerNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new BrokerNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
/**
*/
@Deprecated
public class ClientMain
{
public static void main(String[] args) throws Exception
{
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ClientMain!!!! PLZ Stop. Use BrokerMain instead.");
System.out.println("K thx bye.");
BrokerMain.main(args);
}
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.http;
import com.google.common.base.Function;
import com.google.inject.Inject;
import com.metamx.druid.Query;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
@ -43,6 +44,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final ServiceEmitter emitter;
private final CachingClusteredClient baseClient;
@Inject
public ClientQuerySegmentWalker(
QueryToolChestWarehouse warehouse,
ServiceEmitter emitter,

View File

@ -0,0 +1,28 @@
package com.metamx.druid.query;
import com.google.inject.Inject;
import com.metamx.druid.Query;
import java.util.Map;
/**
*/
public class MapQueryToolChestWarehouse implements QueryToolChestWarehouse
{
private final Map<Class<? extends Query>, QueryToolChest> toolchests;
@Inject
public MapQueryToolChestWarehouse(
Map<Class<? extends Query>, QueryToolChest> toolchests
)
{
this.toolchests = toolchests;
}
@Override
@SuppressWarnings("unchecked")
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(QueryType query)
{
return toolchests.get(query.getClass());
}
}

View File

@ -12,16 +12,14 @@ public class GroupByQueryConfig
@JsonProperty
private int maxIntermediateRows = 50000;
@JsonProperty
private int maxResults = 500000;
public boolean isSingleThreaded()
{
return singleThreaded;
}
public void setSingleThreaded(boolean singleThreaded)
{
this.singleThreaded = singleThreaded;
}
public int getMaxIntermediateRows()
{
return maxIntermediateRows;
@ -31,4 +29,9 @@ public class GroupByQueryConfig
{
this.maxIntermediateRows = maxIntermediateRows;
}
public int getMaxResults()
{
return maxResults;
}
}

View File

@ -22,9 +22,11 @@ package com.metamx.druid.query.group;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence;
@ -34,7 +36,6 @@ import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.input.Rows;
@ -42,7 +43,6 @@ import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import org.joda.time.Minutes;
@ -50,7 +50,6 @@ import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
@ -62,13 +61,14 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private static final int maxRows;
private final Supplier<GroupByQueryConfig> configSupplier;
static {
// I dislike this static loading of properies, but it's the only mechanism available right now.
Properties props = Initialization.loadProperties();
maxRows = PropUtils.getPropertyAsInt(props, "com.metamx.query.groupBy.maxResults", 500000);
@Inject
public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier
)
{
this.configSupplier = configSupplier;
}
@Override
@ -90,6 +90,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{
final GroupByQueryConfig config = configSupplier.get();
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
@ -133,8 +134,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > maxRows) {
throw new ISE("Computation exceeds maxRows limit[%s]", maxRows);
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;

View File

@ -10,13 +10,13 @@ mkdir logs 2>&1 > /dev/null
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime com.metamx.druid.realtime.RealtimeMain 2>&1 > logs/realtime.log &
# And a master node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master com.metamx.druid.cli.Main server coordinator 2>&1 > logs/master.log &
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master io.druid.cli.Main server coordinator 2>&1 > logs/master.log &
# And a compute node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/compute com.metamx.druid.cli.Main server historical 2>&1 > logs/compute.log &
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/compute io.druid.cli.Main server historical 2>&1 > logs/compute.log &
# And a broker node
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/broker com.metamx.druid.http.BrokerMain 2>&1 > logs/broker.log &
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/broker io.druid.cli.Main server broker 2>&1 > logs/broker.log &
echo "Hit CTRL-C to continue..."
exit 0

View File

@ -80,7 +80,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.3</version>
<version>0.0.4</version>
</dependency>
<dependency>

View File

@ -100,14 +100,6 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>

View File

@ -6,10 +6,7 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerInventoryViewProvider;
import com.metamx.druid.client.indexing.IndexingService;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
@ -48,13 +45,9 @@ public class CoordinatorModule implements Module
JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
binder.bind(InventoryView.class).to(ServerInventoryView.class);
binder.bind(RedirectServlet.class).in(LazySingleton.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);

View File

@ -6,12 +6,10 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.Query;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.concurrent.Execs;
@ -42,20 +40,7 @@ import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryConfig;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQueryRunnerFactory;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.apache.curator.framework.CuratorFramework;
@ -99,24 +84,6 @@ public class HistoricalModule implements Module
bindDeepStorageHdfs(binder);
bindDeepStorageCassandra(binder);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = MapBinder.newMapBinder(
binder, new TypeLiteral<Class<? extends Query>>(){}, new TypeLiteral<QueryRunnerFactory>(){}
);
queryFactoryBinder.addBinding(TimeseriesQuery.class).to(TimeseriesQueryRunnerFactory.class).in(LazySingleton.class);
queryFactoryBinder.addBinding(SearchQuery.class).to(SearchQueryRunnerFactory.class).in(LazySingleton.class);
queryFactoryBinder.addBinding(TimeBoundaryQuery.class)
.to(TimeBoundaryQueryRunnerFactory.class)
.in(LazySingleton.class);
queryFactoryBinder.addBinding(SegmentMetadataQuery.class)
.to(SegmentMetadataQueryRunnerFactory.class)
.in(LazySingleton.class);
queryFactoryBinder.addBinding(GroupByQuery.class).to(GroupByQueryRunnerFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
binder.bind(GroupByQueryEngine.class).in(LazySingleton.class);
binder.bind(QueryRunnerFactoryConglomerate.class)
.to(DefaultQueryRunnerFactoryConglomerate.class)
.in(LazySingleton.class);

View File

@ -3,8 +3,13 @@ package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Binding;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.http.client.HttpClient;
@ -13,18 +18,63 @@ import com.metamx.http.client.HttpClientInit;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.validation.constraints.Min;
import java.lang.annotation.Annotation;
/**
*/
public class HttpClientModule implements Module
{
public static HttpClientModule global()
{
return new HttpClientModule("druid.global.http", Global.class);
}
private final String propertyPrefix;
private Annotation annotation = null;
private Class<? extends Annotation> annotationClazz = null;
public HttpClientModule(String propertyPrefix)
{
this.propertyPrefix = propertyPrefix;
}
public HttpClientModule(String propertyPrefix, Class<? extends Annotation> annotation)
{
this.propertyPrefix = propertyPrefix;
this.annotationClazz = annotation;
}
public HttpClientModule(String propertyPrefix, Annotation annotation)
{
this.propertyPrefix = propertyPrefix;
this.annotation = annotation;
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class);
if (annotation != null) {
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class, annotation);
binder.bind(HttpClient.class)
.annotatedWith(annotation)
.toProvider(new HttpClientProvider(annotation))
.in(LazySingleton.class);
}
else if (annotationClazz != null) {
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class, annotationClazz);
binder.bind(HttpClient.class)
.annotatedWith(annotationClazz)
.toProvider(new HttpClientProvider(annotationClazz))
.in(LazySingleton.class);
}
else {
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class);
binder.bind(HttpClient.class)
.toProvider(new HttpClientProvider())
.in(LazySingleton.class);
}
}
public static class DruidHttpClientConfig
@ -47,26 +97,56 @@ public class HttpClientModule implements Module
}
}
@Provides @LazySingleton @Global
public HttpClient makeHttpClient(
Supplier<DruidHttpClientConfig> configSupplier,
Lifecycle lifecycle,
@Nullable SSLContext sslContext
)
public static class HttpClientProvider implements Provider<HttpClient>
{
final DruidHttpClientConfig config = configSupplier.get();
private final Key<Supplier<DruidHttpClientConfig>> configKey;
private final Key<SSLContext> sslContextKey;
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(config.getNumConnections())
.withReadTimeout(config.getReadTimeout());
private Provider<Supplier<DruidHttpClientConfig>> configProvider;
private Provider<Lifecycle> lifecycleProvider;
private Binding<SSLContext> sslContextBinding;
if (sslContext != null) {
builder.withSslContext(sslContext);
public HttpClientProvider()
{
configKey = Key.get(new TypeLiteral<Supplier<DruidHttpClientConfig>>(){});
sslContextKey = Key.get(SSLContext.class);
}
return HttpClientInit.createClient(builder.build(), lifecycle);
public HttpClientProvider(Annotation annotation)
{
configKey = Key.get(new TypeLiteral<Supplier<DruidHttpClientConfig>>(){}, annotation);
sslContextKey = Key.get(SSLContext.class, annotation);
}
public HttpClientProvider(Class<? extends Annotation> annotation)
{
configKey = Key.get(new TypeLiteral<Supplier<DruidHttpClientConfig>>(){}, annotation);
sslContextKey = Key.get(SSLContext.class, annotation);
}
@Inject
public void configure(Injector injector)
{
configProvider = injector.getProvider(configKey);
sslContextBinding = injector.getExistingBinding(sslContextKey);
lifecycleProvider = injector.getProvider(Lifecycle.class);
}
@Override
public HttpClient get()
{
final DruidHttpClientConfig config = configProvider.get().get();
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(config.getNumConnections())
.withReadTimeout(config.getReadTimeout());
if (sslContextBinding != null) {
builder.withSslContext(sslContextBinding.getProvider().get());
}
return HttpClientInit.createClient(builder.build(), lifecycleProvider.get());
}
}
}

View File

@ -0,0 +1,52 @@
package com.metamx.druid.guice;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.metamx.druid.Query;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQueryRunnerFactory;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import java.util.Map;
/**
*/
public class QueryRunnerFactoryModule extends QueryToolChestModule
{
final Map<Class<? extends Query>, Class<? extends QueryRunnerFactory>> mappings =
ImmutableMap.<Class<? extends Query>, Class<? extends QueryRunnerFactory>>builder()
.put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class)
.put(SearchQuery.class, SearchQueryRunnerFactory.class)
.put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class)
.put(SegmentMetadataQuery.class, SegmentMetadataQueryRunnerFactory.class)
.put(GroupByQuery.class, GroupByQueryRunnerFactory.class)
.build();
@Override
public void configure(Binder binder)
{
super.configure(binder);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = MapBinder.newMapBinder(
binder, new TypeLiteral<Class<? extends Query>>(){}, new TypeLiteral<QueryRunnerFactory>(){}
);
for (Map.Entry<Class<? extends Query>, Class<? extends QueryRunnerFactory>> entry : mappings.entrySet()) {
queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue());
binder.bind(entry.getValue()).in(LazySingleton.class);
}
binder.bind(GroupByQueryEngine.class).in(LazySingleton.class);
}
}

View File

@ -42,6 +42,7 @@ import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryConfig;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryQueryToolChest;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
@ -107,6 +108,7 @@ public class ServerInit
return new ComputeScratchPool(config.intermediateComputeSizeBytes());
}
// TODO: Get rid of this method
public static Map<Class<? extends Query>, QueryRunnerFactory> initDefaultQueryTypes(
ConfigurationObjectFactory configFactory,
StupidPool<ByteBuffer> computationBufferPool
@ -114,14 +116,13 @@ public class ServerInit
{
Map<Class<? extends Query>, QueryRunnerFactory> queryRunners = Maps.newLinkedHashMap();
queryRunners.put(TimeseriesQuery.class, new TimeseriesQueryRunnerFactory());
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(new GroupByQueryConfig());
queryRunners.put(
GroupByQuery.class,
new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
Suppliers.ofInstance(new GroupByQueryConfig()), // TODO: Get rid of this
computationBufferPool
),
Suppliers.ofInstance(new GroupByQueryConfig())
new GroupByQueryEngine(configSupplier, computationBufferPool),
configSupplier,
new GroupByQueryQueryToolChest(configSupplier)
)
);
queryRunners.put(SearchQuery.class, new SearchQueryRunnerFactory());

View File

@ -48,19 +48,20 @@ import java.util.concurrent.Future;
*/
public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupByQuery>
{
private static final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest();
private final GroupByQueryEngine engine;
private final Supplier<GroupByQueryConfig> config;
private final GroupByQueryQueryToolChest toolChest;
@Inject
public GroupByQueryRunnerFactory(
GroupByQueryEngine engine,
Supplier<GroupByQueryConfig> config
Supplier<GroupByQueryConfig> config,
GroupByQueryQueryToolChest toolChest
)
{
this.engine = engine;
this.config = config;
this.toolChest = toolChest;
}
@Override
@ -122,7 +123,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
@Override
public QueryToolChest getToolchest()
public QueryToolChest<Row, GroupByQuery> getToolchest()
{
return toolChest;
}

View File

@ -81,9 +81,10 @@ public class GroupByQueryRunnerTest
GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
Suppliers.ofInstance(config),
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@ -95,7 +96,8 @@ public class GroupByQueryRunnerTest
}
)
),
Suppliers.ofInstance(config)
configSupplier,
new GroupByQueryQueryToolChest(configSupplier)
);
return Lists.newArrayList(
@ -245,7 +247,7 @@ public class GroupByQueryRunnerTest
final GroupByQuery fullQuery = builder.build();
final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner<Row>()
{
@Override
@ -332,7 +334,7 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery), String.format("limit: %d", limit)
@ -437,7 +439,7 @@ public class GroupByQueryRunnerTest
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner<Row>()
{
@Override
@ -490,7 +492,7 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
TestHelper.assertExpectedObjects(
@ -530,7 +532,7 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
@ -569,7 +571,7 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 178.24917602539062D)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
TestHelper.assertExpectedObjects(
Iterables.limit(expectedResults, 5), mergeRunner.run(builder.limit(5).build()), "limited"
@ -608,7 +610,7 @@ public class GroupByQueryRunnerTest
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner<Row>()
{
@Override
@ -651,7 +653,7 @@ public class GroupByQueryRunnerTest
createExpectedRow("2011-04-01", "quality", "automotive", "rows", 2L)
);
QueryRunner<Row> mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner);
QueryRunner<Row> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query), "no-limit");
}

View File

@ -55,9 +55,10 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
Suppliers.ofInstance(config),
configSupplier,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@ -69,7 +70,8 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
}
)
),
Suppliers.ofInstance(config)
configSupplier,
new GroupByQueryQueryToolChest(configSupplier)
);
final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory);

View File

@ -0,0 +1,56 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.guice.BrokerModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryToolChestModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.ServerViewModule;
import com.metamx.druid.guice.annotations.Client;
import com.metamx.druid.http.ClientQuerySegmentWalker;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.metrics.MetricsModule;
import io.airlift.command.Command;
/**
*/
@Command(
name = "broker",
description = "Runs a broker node, see https://github.com/metamx/druid/wiki/Broker for a description"
)
public class CliBroker extends ServerRunnable
{
private static final Logger log = new Logger(CliBroker.class);
public CliBroker()
{
super(log);
}
@Override
protected Injector getInjector()
{
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
ServerModule.class,
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ClientQuerySegmentWalker.class),
new QueryToolChestModule(),
new ServerViewModule(),
new HttpClientModule("druid.broker.http", Client.class),
new BrokerModule()
);
}
}

View File

@ -11,6 +11,7 @@ import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.JacksonConfigManagerModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.ServerViewModule;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.http.InfoResource;
import com.metamx.druid.http.MasterResource;
@ -55,7 +56,7 @@ public class CliCoordinator extends ServerRunnable
return Initialization.makeInjector(
new LifecycleModule().register(DruidMaster.class),
EmitterModule.class,
HttpClientModule.class,
HttpClientModule.global(),
DbConnectorModule.class,
JacksonConfigManagerModule.class,
CuratorModule.class,
@ -66,6 +67,7 @@ public class CliCoordinator extends ServerRunnable
.addResource(InfoResource.class)
.addResource(MasterResource.class)
.addResource(StatusResource.class),
new ServerViewModule(),
CoordinatorModule.class
);
}

View File

@ -1,7 +1,6 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator;
@ -9,25 +8,16 @@ import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.guice.HistoricalModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerInitializer;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.metrics.MetricsModule;
import com.metamx.druid.metrics.ServerMonitor;
import io.airlift.command.Command;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
@ -50,34 +40,15 @@ public class CliHistorical extends ServerRunnable
return Initialization.makeInjector(
new LifecycleModule().register(ZkCoordinator.class),
EmitterModule.class,
HttpClientModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule().register(ServerMonitor.class),
ServerModule.class,
new JettyServerModule(new HistoricalJettyServerInitializer())
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ServerManager.class),
new QueryRunnerFactoryModule(),
HistoricalModule.class
);
}
private static class HistoricalJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}
}

View File

@ -19,7 +19,7 @@ public class Main
builder.withGroup("server")
.withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class)
.withCommands(CliCoordinator.class, CliHistorical.class);
.withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class);
builder.build().parse(args).run();
}

View File

@ -0,0 +1,36 @@
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class QueryJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
queries.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}