Merge branch 'master' into autoscaling

Conflicts:
	client/src/main/java/com/metamx/druid/http/BrokerMain.java
	merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
	merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java
This commit is contained in:
Fangjin Yang 2012-11-02 15:57:16 -07:00
commit b1b611735e
37 changed files with 2364 additions and 1685 deletions

6
.gitignore vendored
View File

@ -9,10 +9,6 @@ target
.classpath
.idea
.project
.settings/org.eclipse.jdt.core.prefs
.settings/org.maven.ide.eclipse.prefs
client/.settings/org.eclipse.jdt.core.prefs
common/.settings/org.eclipse.jdt.core.prefs
server/.settings/org.eclipse.jdt.core.prefs
.settings/
examples/rand/RealtimeNode.out
examples/twitter/RealtimeNode.out

View File

@ -18,22 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-client</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-client</name>
<description>druid-client</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>com.metamx</groupId>
@ -55,112 +46,141 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.6.1</version>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
<version>0.9</version>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.4.Final</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -168,18 +188,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -195,24 +204,7 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>

View File

@ -0,0 +1,413 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.index.v1.serde.ComplexMetricRegistererer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.codehaus.jackson.smile.SmileFactory;
import org.mortbay.jetty.Server;
import org.skife.config.ConfigurationObjectFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class BaseNode<T extends BaseNode>
{
private final Logger log;
private final Lifecycle lifecycle;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private PhoneBook phoneBook = null;
private ServiceEmitter emitter = null;
private List<Monitor> monitors = null;
private Server server = null;
private ZkClient zkClient;
private ScheduledExecutorFactory scheduledExecutorFactory;
private RequestLogger requestLogger;
private boolean initialized = false;
public BaseNode(
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
this.log = log;
this.configFactory = configFactory;
this.props = props;
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.smileMapper = smileMapper;
Preconditions.checkNotNull(props, "props");
Preconditions.checkNotNull(lifecycle, "lifecycle");
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
Preconditions.checkNotNull(smileMapper, "smileMapper");
Preconditions.checkNotNull(configFactory, "configFactory");
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
}
@SuppressWarnings("unchecked")
public T setZkClient(ZkClient zkClient)
{
checkFieldNotSetAndSet("zkClient", zkClient);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setPhoneBook(PhoneBook phoneBook)
{
checkFieldNotSetAndSet("phoneBook", phoneBook);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setEmitter(ServiceEmitter emitter)
{
checkFieldNotSetAndSet("emitter", emitter);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setMonitors(List<Monitor> monitors)
{
checkFieldNotSetAndSet("monitors", monitors);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setServer(Server server)
{
checkFieldNotSetAndSet("server", server);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setScheduledExecutorFactory(ScheduledExecutorFactory factory)
{
checkFieldNotSetAndSet("scheduledExecutorFactory", factory);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setRequestLogger(RequestLogger requestLogger)
{
checkFieldNotSetAndSet("requestLogger", requestLogger);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(Class<?>... clazzes)
{
jsonMapper.registerSubtypes(clazzes);
smileMapper.registerSubtypes(clazzes);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(NamedType... namedTypes)
{
jsonMapper.registerSubtypes(namedTypes);
smileMapper.registerSubtypes(namedTypes);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerComplexMetric(ComplexMetricRegistererer registererer)
{
registererer.register();
return (T) this;
}
public Lifecycle getLifecycle()
{
return lifecycle;
}
public ObjectMapper getJsonMapper()
{
return jsonMapper;
}
public ObjectMapper getSmileMapper()
{
return smileMapper;
}
public Properties getProps()
{
return props;
}
public ConfigurationObjectFactory getConfigFactory()
{
return configFactory;
}
public ZkClient getZkClient()
{
initializeZkClient();
return zkClient;
}
public PhoneBook getPhoneBook()
{
initializePhoneBook();
return phoneBook;
}
public ServiceEmitter getEmitter()
{
initializeEmitter();
return emitter;
}
public List<Monitor> getMonitors()
{
initializeMonitors();
return monitors;
}
public Server getServer()
{
initializeServer();
return server;
}
public ScheduledExecutorFactory getScheduledExecutorFactory()
{
initializeScheduledExecutorFactory();
return scheduledExecutorFactory;
}
public RequestLogger getRequestLogger()
{
initializeRequestLogger();
return requestLogger;
}
private void initializeRequestLogger()
{
if (requestLogger == null) {
try {
setRequestLogger(Initialization.makeRequestLogger(getScheduledExecutorFactory(), getProps()));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
lifecycle.addManagedInstance(requestLogger);
}
}
private void initializeScheduledExecutorFactory()
{
if (scheduledExecutorFactory == null) {
setScheduledExecutorFactory(ScheduledExecutors.createFactory(getLifecycle()));
}
}
private void initializeZkClient()
{
if (zkClient == null) {
setZkClient(Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle));
}
}
private void initializePhoneBook()
{
if (phoneBook == null) {
setPhoneBook(
Initialization.createPhoneBook(
jsonMapper,
getZkClient(),
"PhoneBook--%s",
lifecycle
)
);
}
}
private void initializeServer()
{
if (server == null) {
setServer(Initialization.makeJettyServer(configFactory.build(ServerConfig.class)));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
List<Monitor> theMonitors = Lists.newArrayList();
theMonitors.add(new JvmMonitor());
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "false"))) {
theMonitors.add(new SysMonitor());
}
setMonitors(theMonitors);
}
}
private void initializeEmitter()
{
if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
setEmitter(
new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
)
);
}
EmittingLogger.registerEmitter(emitter);
}
protected void init() throws Exception
{
doInit();
initialized = true;
}
protected abstract void doInit() throws Exception;
@LifecycleStart
public synchronized void start() throws Exception
{
if (! initialized) {
init();
}
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
protected ScheduledExecutorService startMonitoring(List<Monitor> monitors)
{
final ScheduledExecutorService globalScheduledExec = getScheduledExecutorFactory().create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
getConfigFactory().build(MonitorSchedulerConfig.class),
globalScheduledExec,
getEmitter(),
monitors
);
getLifecycle().addManagedInstance(monitorScheduler);
return globalScheduledExec;
}
protected void checkFieldNotSetAndSet(String fieldName, Object value)
{
Class<?> theClazz = this.getClass();
while (theClazz != null && theClazz != Object.class) {
try {
final Field field = theClazz.getDeclaredField(fieldName);
field.setAccessible(true);
Preconditions.checkState(field.get(this) == null, "Cannot set %s once it has already been set.", fieldName);
field.set(this, value);
return;
}
catch (NoSuchFieldException e) {
// Perhaps it is inherited?
theClazz = theClazz.getSuperclass();
}
catch (IllegalAccessException e) {
throw Throwables.propagate(e);
}
}
throw new ISE("Unknown field[%s] on class[%s]", fieldName, this.getClass());
}
}

View File

@ -38,9 +38,9 @@ import java.util.concurrent.Executor;
/**
*/
public class ClientSideServerView implements MutableServerView
public class BrokerServerView implements MutableServerView
{
private static final Logger log = new Logger(ClientSideServerView.class);
private static final Logger log = new Logger(BrokerServerView.class);
private final Object lock = new Object();
@ -53,7 +53,7 @@ public class ClientSideServerView implements MutableServerView
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
public ClientSideServerView(
public BrokerServerView(
QueryToolChestWarehouse warehose,
ObjectMapper smileMapper,
HttpClient httpClient

View File

@ -23,19 +23,18 @@ import org.skife.config.Config;
/**
*/
public abstract class ClientConfig
public abstract class ClientConfig extends InventoryManagerConfig
{
public ClientConfig()
{
super(null, null);
}
@Override
@Config("druid.zk.paths.announcementsPath")
public abstract String getAnnouncementsPath();
public abstract String getInventoryIdPath();
@Override
@Config("druid.zk.paths.servedSegmentsPath")
public abstract String getServedSegmentsPath();
public InventoryManagerConfig getClientInventoryManagerConfig()
{
return new InventoryManagerConfig(
getAnnouncementsPath(),
getServedSegmentsPath()
);
}
public abstract String getInventoryPath();
}

View File

@ -19,55 +19,9 @@
package com.metamx.druid.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.ClientSideServerView;
import com.metamx.druid.client.cache.CacheBroker;
import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.client.cache.MapCacheBroker;
import com.metamx.druid.client.cache.MapCacheBrokerConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.query.QueryToolChestWarehouse;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@ -80,125 +34,20 @@ public class BrokerMain
{
LogLevelAdjuster.register();
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final ObjectMapper smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
Lifecycle lifecycle = new Lifecycle();
final Properties props = Initialization.loadProperties();
final Lifecycle lifecycle = new Lifecycle();
final ConfigurationObjectFactory configFactory = Config.createFactory(props);
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
final PhoneBook phoneBook = Initialization.createYellowPages(
jsonMapper, zkClient, "Client-ZKYP--%s", lifecycle
lifecycle.addManagedInstance(
BrokerNode.builder().build()
);
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder()
.withNumConnections(
Integer.parseInt(props.getProperty("druid.client.http.connections"))
)
.build(),
lifecycle
);
final HttpClient emitterHttpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
final ServiceEmitter emitter = new ServiceEmitter(
props.getProperty("druid.service"),
props.getProperty("druid.host"),
Emitters.create(props, emitterHttpClient, jsonMapper, lifecycle)
);
final QueryToolChestWarehouse warehouse = new ReflectionQueryToolChestWarehouse();
final ClientConfig clientConfig = configFactory.build(ClientConfig.class);
final ClientSideServerView view = new ClientSideServerView(warehouse, smileMapper, httpClient);
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
clientConfig.getClientInventoryManagerConfig(),
phoneBook,
view
);
lifecycle.addManagedInstance(clientInventoryManager);
final CacheBroker cacheBroker = MapCacheBroker.create(
configFactory.buildWithReplacements(MapCacheBrokerConfig.class, ImmutableMap.of("prefix", "druid.bard.cache"))
);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, smileMapper);
lifecycle.addManagedInstance(baseClient);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
ImmutableList.<Monitor>of(
new JvmMonitor(),
new SysMonitor(),
new CacheMonitor(cacheBroker)
)
);
lifecycle.addManagedInstance(monitorScheduler);
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig,
lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
configFactory.build(ServiceDiscoveryConfig.class),
lifecycle
);
final RequestLogger requestLogger = Initialization.makeRequestLogger(
scheduledExecutorFactory.create(
1,
"RequestLogger--%d"
),
props
);
lifecycle.addManagedInstance(requestLogger);
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, emitter, baseClient);
final Injector injector = Guice.createInjector(new ClientServletModule(texasRanger, clientInventoryManager, jsonMapper));
final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
final Context root = new Context(server, "/druid/v2", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(new QueryServlet(jsonMapper, smileMapper, texasRanger, emitter, requestLogger)),
"/*"
);
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/heatmap/*", 0);
root.addFilter(GuiceFilter.class, "/datasources/*", 0);
try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
log.info(t, "Throwable caught at startup, committing seppuku");
System.exit(2);
}
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
);
server.start();
server.join();
lifecycle.join();
}
}

View File

@ -0,0 +1,322 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseNode;
import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.cache.CacheBroker;
import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.client.cache.MapCacheBroker;
import com.metamx.druid.client.cache.MapCacheBrokerConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.QueryToolChestWarehouse;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
import com.metamx.druid.utils.PropUtils;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.Monitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties;
/**
*/
public class BrokerNode extends BaseNode<BrokerNode>
{
private static final Logger log = new Logger(BrokerNode.class);
private final List<Module> extraModules = Lists.newArrayList();
private final List<String> pathsForGuiceFilter = Lists.newArrayList();
private QueryToolChestWarehouse warehouse = null;
private HttpClient brokerHttpClient = null;
private CacheBroker cacheBroker = null;
private boolean useDiscovery = true;
public static Builder builder()
{
return new Builder();
}
public BrokerNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryToolChestWarehouse getWarehouse()
{
initializeWarehouse();
return warehouse;
}
public BrokerNode setWarehouse(QueryToolChestWarehouse warehouse)
{
checkFieldNotSetAndSet("warehouse", warehouse);
return this;
}
public HttpClient getBrokerHttpClient()
{
initializeBrokerHttpClient();
return brokerHttpClient;
}
public BrokerNode setBrokerHttpClient(HttpClient brokerHttpClient)
{
checkFieldNotSetAndSet("brokerHttpClient", brokerHttpClient);
return this;
}
public CacheBroker getCacheBroker()
{
initializeCacheBroker();
return cacheBroker;
}
public BrokerNode setCacheBroker(CacheBroker cacheBroker)
{
checkFieldNotSetAndSet("cacheBroker", cacheBroker);
return this;
}
public BrokerNode useDiscovery(boolean useDiscovery)
{
this.useDiscovery = useDiscovery;
return this;
}
/**
* This method allows you to specify more Guice modules to use primarily for injected extra Jersey resources.
* I'd like to remove the Guice dependency for this, but I don't know how to set up Jersey without Guice...
*
* This is deprecated because at some point in the future, we will eliminate the Guice dependency and anything
* that uses this will break. Use at your own risk.
*
* @param module the module to register with Guice
*
* @return this
*/
@Deprecated
public BrokerNode addModule(Module module)
{
extraModules.add(module);
return this;
}
/**
* This method is used to specify extra paths that the GuiceFilter should pay attention to.
*
* This is deprecated for the same reason that addModule is deprecated.
*
* @param path the path that the GuiceFilter should pay attention to.
*
* @return this
*/
@Deprecated
public BrokerNode addPathForGuiceFilter(String path)
{
pathsForGuiceFilter.add(path);
return this;
}
@Override
protected void doInit() throws Exception
{
initializeWarehouse();
initializeBrokerHttpClient();
initializeCacheBroker();
initializeDiscovery();
final Lifecycle lifecycle = getLifecycle();
final List<Monitor> monitors = getMonitors();
monitors.add(new CacheMonitor(cacheBroker));
startMonitoring(monitors);
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
getConfigFactory().build(ClientConfig.class), getPhoneBook(), view
);
lifecycle.addManagedInstance(clientInventoryManager);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper());
lifecycle.addManagedInstance(baseClient);
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, getEmitter(), baseClient);
List<Module> theModules = Lists.newArrayList();
theModules.add(new ClientServletModule(texasRanger, clientInventoryManager, getJsonMapper()));
theModules.addAll(extraModules);
final Injector injector = Guice.createInjector(theModules);
final Context root = new Context(getServer(), "/druid/v2", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())),
"/*"
);
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/datasources/*", 0);
for (String path : pathsForGuiceFilter) {
root.addFilter(GuiceFilter.class, path, 0);
}
}
private void initializeDiscovery() throws Exception
{
if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle();
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig.getZkHosts(), lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryConfig, lifecycle
);
}
}
private void initializeCacheBroker()
{
if (cacheBroker == null) {
setCacheBroker(
MapCacheBroker.create(
getConfigFactory().buildWithReplacements(
MapCacheBrokerConfig.class,
ImmutableMap.of("prefix", "druid.bard.cache")
)
)
);
}
}
private void initializeBrokerHttpClient()
{
if (brokerHttpClient == null) {
setBrokerHttpClient(
HttpClientInit.createClient(
HttpClientConfig
.builder()
.withNumConnections(PropUtils.getPropertyAsInt(getProps(), "druid.client.http.connections"))
.build(),
getLifecycle()
)
);
}
}
private void initializeWarehouse()
{
if (warehouse == null) {
setWarehouse(new ReflectionQueryToolChestWarehouse());
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public BrokerNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new BrokerNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}

View File

@ -23,11 +23,13 @@ import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.ZKPhoneBook;
import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.utils.PropUtils;
import com.metamx.druid.zk.StringZkSerializer;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
@ -87,7 +89,7 @@ public class Initialization
return retVal;
}
public static ZKPhoneBook createYellowPages(
public static ZKPhoneBook createPhoneBook(
ObjectMapper jsonMapper, ZkClient zkClient, String threadNameFormat, Lifecycle lifecycle
)
{
@ -275,15 +277,11 @@ public class Initialization
return serviceProvider;
}
public static RequestLogger makeRequestLogger(ScheduledExecutorService exec, Properties props) throws IOException
public static RequestLogger makeRequestLogger(ScheduledExecutorFactory factory, Properties props) throws IOException
{
final String property = "druid.request.logging.dir";
final String loggingDir = props.getProperty(property);
if (loggingDir == null) {
throw new ISE("property[%s] not set.", property);
}
return new FileRequestLogger(exec, new File(loggingDir));
return new FileRequestLogger(
factory.create(1, "RequestLogger-%s"),
new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
);
}
}

View File

@ -141,32 +141,4 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
}
);
}
private static class ThreadNamingCallable<T> implements Callable<T>
{
private final Callable<T> baseCallable;
private final String name;
ThreadNamingCallable(
Callable<T> baseCallable,
String name
)
{
this.baseCallable = baseCallable;
this.name = name;
}
@Override
public T call() throws Exception
{
String oldName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(name);
return baseCallable.call();
}
finally {
Thread.currentThread().setName(oldName);
}
}
}
}

View File

@ -18,18 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-common</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-common</name>
<description>druid-common</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<parent>
<groupId>com.metamx</groupId>
@ -37,67 +32,15 @@
<version>0.1.0-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.16.0</metamx.java-util.version>
</properties>
<dependencies>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<version>${metamx.java-util.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<version>2.32</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
@ -105,38 +48,79 @@
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.mozilla</groupId>
<artifactId>rhino</artifactId>
<version>1.7R4</version>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<dependency>
<groupId>org.mozilla</groupId>
<artifactId>rhino</artifactId>
<version>1.7R4</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<version>${metamx.java-util.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
@ -145,33 +129,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>

View File

@ -88,7 +88,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
if (exists == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object));
TreeMap<VersionType, TimelineEntry> versionEntry = Maps.newTreeMap(versionComparator);
TreeMap<VersionType, TimelineEntry> versionEntry = new TreeMap<VersionType, TimelineEntry>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
} else {

View File

@ -59,7 +59,6 @@ public class CombiningSequence<T> implements Sequence<T>
}
@Override
@SuppressWarnings("unchecked")
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> accumulator)
{
final AtomicReference<OutType> retVal = new AtomicReference<OutType>(initValue);

View File

@ -83,7 +83,6 @@ public class JodaStuff
super(DateTime.class);
}
@SuppressWarnings("unchecked")
@Override
public DateTime deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException

View File

@ -18,69 +18,22 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<packaging>pom</packaging>
<version>0.1.0-SNAPSHOT</version>
<name>druid-examples</name>
<description>druid-examples</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modules>
<module>rand</module>
<module>twitter</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>pub-libs</id>
<name>pub-libs-local</name>
<url>https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local</url>
</repository>
<repository>
<id>repo.codahale.com</id>
<url>http://repo.codahale.com</url>
</repository>
<repository>
<id>nativelibs4java</id>
<url>http://nativelibs4java.sourceforge.net/maven</url>
</repository>
<repository>
<id>thirdparty-uploads</id>
<name>JBoss Thirdparty Uploads</name>
<url>https://repository.jboss.org/nexus/content/repositories/thirdparty-uploads</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>central-local</id>
<name>Central</name>
<url>https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local</url>
</repository>
</distributionManagement>
</project>

View File

@ -1,20 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples-rand</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-examples-rand</name>
<description>druid-examples-rand</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>com.metamx</groupId>
@ -41,117 +32,111 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.6.1</version>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
<version>0.9</version>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -159,18 +144,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -186,24 +160,7 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>

View File

@ -1,20 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples-twitter</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-examples-twitter</name>
<description>druid-examples-twitter</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>com.metamx</groupId>
@ -41,15 +32,97 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.6.1</version>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.0.7</version>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
@ -65,109 +138,20 @@
<artifactId>twitter4j-stream</artifactId>
<version>2.2.6</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
<version>4.8.1</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -175,18 +159,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -202,24 +175,7 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>

View File

@ -18,18 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-index-common</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-index-common</name>
<description>Druid Indexer</description>
<scm>
<connection>scm:git:git://github.com/metamx/druid-index-common.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid-index-common.git</developerConnection>
<url>http://github.com/metamx/druid-index-common</url>
</scm>
<parent>
<groupId>com.metamx</groupId>
@ -43,53 +38,46 @@
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
<version>0.8.4</version>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -77,7 +77,7 @@ public class IndexIO
public static void registerHandler(IndexIOHandler handler)
{
if (handler == null) {
if (IndexIO.handler == null) {
IndexIO.handler = handler;
}
else {

View File

@ -17,14 +17,17 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination.legacy;
import org.skife.config.Config;
package com.metamx.druid.index.v1.serde;
/**
* This is a "factory" interface for registering complex metrics in the system. It exists because I'm unaware of
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
* must be instantiatable via a no argument default constructor (the MR jobs on Hadoop use reflection to instantiate
* instances).
*
* The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult.
*/
public abstract class TheSizeAdjusterConfig
public interface ComplexMetricRegistererer
{
@Config("druid.zk.paths.indexesPath")
public abstract String getSegmentBasePath();
public void register();
}

View File

@ -18,18 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-indexer</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-indexer</name>
<description>Druid Indexer</description>
<scm>
<connection>scm:git:git://github.com/metamx/druid-indexer.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid-indexer.git</developerConnection>
<url>http://github.com/metamx/druid-indexer</url>
</scm>
<parent>
<groupId>com.metamx</groupId>
@ -50,10 +45,10 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
@ -65,12 +60,48 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -78,18 +109,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -112,8 +132,4 @@
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -18,18 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-merger</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-merger</name>
<description>druid-merger</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<parent>
<groupId>com.metamx</groupId>
@ -38,6 +33,11 @@
</parent>
<dependencies>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
@ -48,6 +48,11 @@
<artifactId>druid-client</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-index-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-indexer</artifactId>
@ -59,17 +64,118 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.3</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -77,18 +183,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -103,21 +198,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</project>

279
pom.xml
View File

@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
@ -32,6 +33,10 @@
<url>http://www.github.com/metamx/druid</url>
</scm>
<prerequisites>
<maven>3.0.3</maven>
</prerequisites>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@ -47,19 +52,287 @@
<module>examples</module>
</modules>
<dependencyManagement>
<dependencies>
<!-- Compile Scope -->
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
<version>0.8.4</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-client</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
<version>0.4</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<version>2.32</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5-20081211</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<!-- Test Scope -->
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<type>test-jar</type>
<scope>test</scope>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
</plugin>
<plugin>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.3.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>1.7.1</version>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.1</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<version>2.12.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>

View File

@ -18,18 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-realtime</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-realtime</name>
<description>druid-realtime</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<parent>
<groupId>com.metamx</groupId>
@ -52,15 +47,72 @@
<artifactId>druid-client</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.directory.studio</groupId>
<artifactId>org.apache.commons.collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>kafka</groupId>
@ -74,9 +126,18 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.studio</groupId>
<artifactId>org.apache.commons.collections</artifactId>
<version>3.2.1</version>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Override deps so that our jackson is compatible with druid-server -->
@ -90,25 +151,21 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
<version>0.4</version>
<optional>true</optional>
</dependency>
<!-- Dependencies required for jets3t -->
@ -117,13 +174,11 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
@ -139,18 +194,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -165,20 +209,9 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<configuration>
<args>
<arg>-unchecked</arg>
@ -208,16 +241,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -19,74 +19,53 @@
package com.metamx.druid.realtime;
import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.BeanProperty;
import org.codehaus.jackson.map.DeserializationContext;
import org.codehaus.jackson.map.InjectableValues;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.codehaus.jackson.smile.SmileFactory;
import org.codehaus.jackson.type.TypeReference;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class RealtimeNode
public class RealtimeNode extends BaseServerNode<RealtimeNode>
{
private static final Logger log = new Logger(RealtimeNode.class);
@ -95,155 +74,112 @@ public class RealtimeNode
return new Builder();
}
private final Lifecycle lifecycle;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();
private PhoneBook phoneBook = null;
private ServiceEmitter emitter = null;
private ServerView view = null;
private MetadataUpdater metadataUpdater = null;
private QueryRunnerFactoryConglomerate conglomerate = null;
private SegmentPusher segmentPusher = null;
private List<FireDepartment> fireDepartments = null;
private List<Monitor> monitors = null;
private Server server = null;
private ServerView view = null;
private boolean initialized = false;
public RealtimeNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
Lifecycle lifecycle,
Properties props,
ConfigurationObjectFactory configFactory
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.lifecycle = lifecycle;
this.props = props;
this.configFactory = configFactory;
}
public RealtimeNode setPhoneBook(PhoneBook phoneBook)
{
this.phoneBook = phoneBook;
return this;
}
public RealtimeNode setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public RealtimeNode setView(ServerView view)
{
Preconditions.checkState(this.view == null, "Cannot set view once it has already been set.");
this.view = view;
return this;
}
public RealtimeNode setMetadataUpdater(MetadataUpdater metadataUpdater)
{
Preconditions.checkState(this.metadataUpdater == null, "Cannot set metadataUpdater once it has already been set.");
this.metadataUpdater = metadataUpdater;
return this;
}
public RealtimeNode setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
this.conglomerate = conglomerate;
return this;
}
public RealtimeNode setSegmentPusher(SegmentPusher segmentPusher)
{
Preconditions.checkState(this.segmentPusher == null, "Cannot set segmentPusher once it has already been set.");
this.segmentPusher = segmentPusher;
return this;
}
public RealtimeNode setFireDepartments(List<FireDepartment> fireDepartments)
{
Preconditions.checkState(this.fireDepartments == null, "Cannot set fireDepartments once it has already been set.");
this.fireDepartments = fireDepartments;
return this;
}
public RealtimeNode setMonitors(List<Monitor> monitors)
{
this.monitors = Lists.newArrayList(monitors);
return this;
}
public void setServer(Server server)
{
this.server = server;
}
public RealtimeNode registerJacksonInjectable(String name, Object object)
{
Preconditions.checkState(injectablesMap.containsKey(name), "Already registered jackson object[%s]", name);
injectablesMap.put(name, object);
return this;
}
public RealtimeNode registerJacksonSubtype(Class<?>... clazzes)
public MetadataUpdater getMetadataUpdater()
{
jsonMapper.registerSubtypes(clazzes);
return this;
initializeMetadataUpdater();
return metadataUpdater;
}
public RealtimeNode registerJacksonSubtype(NamedType... namedTypes)
public SegmentPusher getSegmentPusher()
{
jsonMapper.registerSubtypes(namedTypes);
return this;
initializeSegmentPusher();
return segmentPusher;
}
private void init() throws Exception
public List<FireDepartment> getFireDepartments()
{
if (phoneBook == null) {
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
phoneBook = Initialization.createYellowPages(
jsonMapper,
zkClient,
"Realtime-ZKYP--%s",
lifecycle
);
initializeFireDepartments();
return fireDepartments;
}
initializeEmitter();
public ServerView getView()
{
initializeView();
return view;
}
protected void doInit() throws Exception
{
initializeView();
initializeMetadataUpdater();
initializeQueryRunnerFactoryConglomerate();
initializeSegmentPusher();
initializeMonitors();
initializeServer();
initializeJacksonInjectables();
initializeFireDepartments();
final Lifecycle lifecycle = getLifecycle();
final ServiceEmitter emitter = getEmitter();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final List<Monitor> monitors = getMonitors();
monitors.add(new RealtimeMetricsMonitor(fireDepartments));
final RealtimeManager realtimeManager = new RealtimeManager(fireDepartments, conglomerate);
lifecycle.addManagedInstance(realtimeManager);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
monitors
);
lifecycle.addManagedInstance(monitorScheduler);
startMonitoring(monitors);
final RequestLogger requestLogger = Initialization.makeRequestLogger(globalScheduledExec, props);
lifecycle.addManagedInstance(requestLogger);
final Context v2Druid = new Context(server, "/druid/v2", Context.SESSIONS);
final Context v2Druid = new Context(getServer(), "/druid/v2", Context.SESSIONS);
v2Druid.addServlet(new ServletHolder(new StatusServlet()), "/status");
v2Druid.addServlet(
new ServletHolder(new QueryServlet(jsonMapper, smileMapper, realtimeManager, emitter, requestLogger)),
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), realtimeManager, emitter, getRequestLogger())
),
"/*"
);
@ -257,47 +193,16 @@ public class RealtimeNode
init();
}
lifecycle.start();
getLifecycle().start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
getLifecycle().stop();
}
private void initializeServer()
{
if (server == null) {
server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeJacksonInjectables()
protected void initializeJacksonInjectables()
{
final Map<String, Object> injectables = Maps.newHashMap();
@ -305,13 +210,13 @@ public class RealtimeNode
injectables.put(entry.getKey(), entry.getValue());
}
injectables.put("queryRunnerFactoryConglomerate", conglomerate);
injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
injectables.put("segmentPusher", segmentPusher);
injectables.put("metadataUpdater", metadataUpdater);
injectables.put("serverView", view);
injectables.put("serviceEmitter", emitter);
injectables.put("serviceEmitter", getEmitter());
jsonMapper.setInjectableValues(
getJsonMapper().setInjectableValues(
new InjectableValues()
{
@Override
@ -325,96 +230,70 @@ public class RealtimeNode
);
}
private void initializeMonitors()
{
if (monitors == null) {
monitors = Lists.newArrayList();
monitors.add(new JvmMonitor());
monitors.add(new SysMonitor());
}
}
private void initializeFireDepartments() throws IOException
private void initializeFireDepartments()
{
if (fireDepartments == null) {
fireDepartments = jsonMapper.readValue(
new File(PropUtils.getProperty(props, "druid.realtime.specFile")),
try {
fireDepartments = getJsonMapper().readValue(
new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")),
new TypeReference<List<FireDepartment>>(){}
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
private void initializeSegmentPusher() throws S3ServiceException
private void initializeSegmentPusher()
{
if (segmentPusher == null) {
final RestS3Service s3Client = new RestS3Service(
final Properties props = getProps();
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
segmentPusher = new S3SegmentPusher(s3Client, configFactory.build(S3SegmentPusherConfig.class), jsonMapper);
segmentPusher = new S3SegmentPusher(s3Client, getConfigFactory().build(S3SegmentPusherConfig.class), getJsonMapper());
}
}
private void initializeQueryRunnerFactoryConglomerate()
{
if (conglomerate == null) {
StupidPool<ByteBuffer> computationBufferPool = ServerInit.makeComputeScratchPool(
PropUtils.getPropertyAsInt(props, "druid.computation.buffer.size", 1024 * 1024 * 1024)
);
conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ServerInit.initDefaultQueryTypes(configFactory, computationBufferPool)
);
}
}
private void initializeMetadataUpdater()
protected void initializeMetadataUpdater()
{
if (metadataUpdater == null) {
metadataUpdater = new MetadataUpdater(
jsonMapper,
configFactory.build(MetadataUpdaterConfig.class),
phoneBook,
new DbConnector(configFactory.build(DbConnectorConfig.class)).getDBI()
getJsonMapper(),
getConfigFactory().build(MetadataUpdaterConfig.class),
getPhoneBook(),
new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI()
);
lifecycle.addManagedInstance(metadataUpdater);
getLifecycle().addManagedInstance(metadataUpdater);
}
}
private void initializeView()
{
if (view == null) {
final ClientConfig clientConfig = configFactory.build(ClientConfig.class);
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
clientConfig.getClientInventoryManagerConfig(),
phoneBook,
getConfigFactory().build(ClientConfig.class),
getPhoneBook(),
view
);
lifecycle.addManagedInstance(clientInventoryManager);
getLifecycle().addManagedInstance(clientInventoryManager);
this.view = view;
}
}
private void initializeEmitter()
{
if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
);
}
EmittingLogger.registerEmitter(emitter);
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
@ -465,7 +344,7 @@ public class RealtimeNode
configFactory = Config.createFactory(props);
}
return new RealtimeNode(jsonMapper, smileMapper, lifecycle, props, configFactory);
return new RealtimeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}

View File

@ -18,18 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>druid-server</name>
<description>druid-server</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
</scm>
<parent>
<groupId>com.metamx</groupId>
@ -50,9 +45,26 @@
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.26</version>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
@ -62,92 +74,127 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
<version>0.8.4</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.9.1</version>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.9.1</version>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
<version>1.9.1</version>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.6</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-smile</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
<version>0.4</version>
<optional>true</optional>
</dependency>
<!-- Dependencies required for jets3t -->
@ -156,13 +203,11 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
@ -178,18 +223,7 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
@ -204,31 +238,8 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-help-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<systemPropertyVariables>
<user.timezone>UTC</user.timezone>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>

View File

@ -68,8 +68,11 @@ public class TsvToJson
}
}
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8));
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8));
BufferedReader in = null;
BufferedWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8));
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8));
String line = null;
int count = 0;
long currTime = System.currentTimeMillis();
@ -102,9 +105,15 @@ public class TsvToJson
}
System.out.printf("Completed %,d lines in %,d millis.%n", count, System.currentTimeMillis() - startTime);
out.flush();
} finally {
if (out != null) {
out.close();
}
if (in != null) {
in.close();
}
}
}
public static interface FieldHandler
{

View File

@ -0,0 +1,124 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.skife.config.ConfigurationObjectFactory;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
/**
*/
public abstract class BaseServerNode<T extends BaseNode> extends BaseNode<T>
{
private final Map<Class<? extends Query>, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap();
private QueryRunnerFactoryConglomerate conglomerate = null;
private StupidPool<ByteBuffer> computeScratchPool = null;
public BaseServerNode(
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryRunnerFactoryConglomerate getConglomerate()
{
initializeQueryRunnerFactoryConglomerate();
return conglomerate;
}
public StupidPool<ByteBuffer> getComputeScratchPool()
{
initializeComputeScratchPool();
return computeScratchPool;
}
@SuppressWarnings("unchecked")
public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
checkFieldNotSetAndSet("conglomerate", conglomerate);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setComputeScratchPool(StupidPool<ByteBuffer> computeScratchPool)
{
checkFieldNotSetAndSet("computeScratchPool", computeScratchPool);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerQueryRunnerFactory(Class<? extends Query> queryClazz, QueryRunnerFactory factory)
{
Preconditions.checkState(
conglomerate == null,
"Registering a QueryRunnerFactory only works when a separate conglomerate is not specified."
);
Preconditions.checkState(
!additionalFactories.containsKey(queryClazz), "Registered factory for class[%s] multiple times", queryClazz
);
additionalFactories.put(queryClazz, factory);
return (T) this;
}
private void initializeComputeScratchPool()
{
if (computeScratchPool == null) {
setComputeScratchPool(
ServerInit.makeComputeScratchPool(
PropUtils.getPropertyAsInt(getProps(), "druid.computation.buffer.size", 1024 * 1024 * 1024)
)
);
}
}
private void initializeQueryRunnerFactoryConglomerate()
{
if (conglomerate == null) {
final Map<Class<? extends Query>, QueryRunnerFactory> factories = ServerInit.initDefaultQueryTypes(
getConfigFactory(), getComputeScratchPool()
);
for (Map.Entry<Class<? extends Query>, QueryRunnerFactory> entry : additionalFactories.entrySet()) {
factories.put(entry.getKey(), entry.getValue());
}
setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories));
}
}
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.coordination;
import com.google.common.base.Function;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query;
import com.metamx.druid.StorageAdapter;
@ -92,13 +93,17 @@ public class ServerManager implements QuerySegmentWalker
public Map<String, Long> getDataSourceSizes()
{
synchronized (dataSourceSizes) {
return dataSourceSizes.snapshot();
}
}
public Map<String, Long> getDataSourceCounts()
{
synchronized (dataSourceCounts) {
return dataSourceCounts.snapshot();
}
}
public void loadSegment(final DataSegment segment) throws StorageAdapterLoadingException
{
@ -107,7 +112,12 @@ public class ServerManager implements QuerySegmentWalker
adapter = storageAdapterLoader.getAdapter(segment.getLoadSpec());
}
catch (StorageAdapterLoadingException e) {
try {
storageAdapterLoader.cleanupAdapter(segment.getLoadSpec());
}
catch (StorageAdapterLoadingException e1) {
// ignore
}
throw e;
}
@ -138,10 +148,14 @@ public class ServerManager implements QuerySegmentWalker
loadedIntervals.add(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(adapter)
);
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L);
}
}
}
public void dropSegment(final DataSegment segment) throws StorageAdapterLoadingException
{
@ -160,8 +174,12 @@ public class ServerManager implements QuerySegmentWalker
StorageAdapter oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, -segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L);
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
@ -179,10 +197,7 @@ public class ServerManager implements QuerySegmentWalker
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
log.makeAlert("Unknown query type, [%s]", query.getClass())
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<T>();
throw new ISE("Unknown query type[%s].", query.getClass());
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
@ -209,7 +224,6 @@ public class ServerManager implements QuerySegmentWalker
new Function<TimelineObjectHolder<String, StorageAdapter>, Iterable<QueryRunner<T>>>()
{
@Override
@SuppressWarnings("unchecked")
public Iterable<QueryRunner<T>> apply(@Nullable final TimelineObjectHolder<String, StorageAdapter> holder)
{
if (holder == null) {

View File

@ -1,69 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination.legacy;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.util.Map;
/**
*/
public class S3SizeLookup implements SizeLookup
{
private static final Logger log = new Logger(S3SizeLookup.class);
private final RestS3Service s3Client;
public S3SizeLookup(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public Long lookupSize(Map<String, Object> loadSpec)
{
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
S3Object s3Obj = null;
try {
s3Obj = s3Client.getObjectDetails(new S3Bucket(s3Bucket), s3Path);
}
catch (S3ServiceException e) {
log.warn(e, "Exception when trying to lookup size for s3://%s/%s", s3Bucket, s3Path);
return null;
}
if (s3Obj == null) {
log.warn("s3Object for s3://%s/%s was null.", s3Bucket, s3Path);
return null;
}
return s3Obj.getContentLength();
}
}

View File

@ -1,106 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination.legacy;
import com.google.common.base.Joiner;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
/**
*/
public class TheSizeAdjuster
{
private static final Logger log = new Logger(TheSizeAdjuster.class);
private static final Joiner JOINER = Joiner.on("/");
private final TheSizeAdjusterConfig config;
private final ObjectMapper jsonMapper;
private final Map<String, SizeLookup> lookups;
private final ZkClient zkClient;
public TheSizeAdjuster(
TheSizeAdjusterConfig config,
ObjectMapper jsonMapper,
Map<String, SizeLookup> lookups,
ZkClient zkClient
)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.lookups = lookups;
this.zkClient = zkClient;
}
public Long lookupSize(Map<String, Object> descriptor)
{
String type = MapUtils.getString(descriptor, "type");
SizeLookup adjuster = lookups.get(type);
if (adjuster == null) {
log.warn("Unknown type[%s] for SizeAdjuster, known types are %s", type, lookups.keySet());
return null;
}
return adjuster.lookupSize(descriptor);
}
public DataSegment updateDescriptor(DataSegment dataSegment)
{
Long size = lookupSize(dataSegment.getLoadSpec());
if (size == null || size < 0) {
log.warn("Unable to determine size[%s] of segment[%s], ignoring.", size, dataSegment);
return null;
}
final DataSegment segment = new DataSegment(
dataSegment.getDataSource(),
dataSegment.getInterval(),
dataSegment.getVersion() + "_w_size",
dataSegment.getLoadSpec(),
dataSegment.getDimensions(),
dataSegment.getMetrics(),
dataSegment.getShardSpec(),
size
);
String oldSegmentPath = JOINER.join(config.getSegmentBasePath(), dataSegment.getDataSource(), dataSegment.getIdentifier());
String newSegmentPath = JOINER.join(config.getSegmentBasePath(), segment.getDataSource(), segment.getIdentifier());
try {
String data = jsonMapper.writeValueAsString(segment);
zkClient.createPersistent(newSegmentPath, data);
log.info("Created new segment node[%s] with content[%s]", newSegmentPath, data);
zkClient.delete(oldSegmentPath);
log.info("Deleted old segment node[%s]", oldSegmentPath);
}
catch (IOException e) {
log.warn(e, "Exception thrown on segment[%s]", segment);
return null;
}
return segment;
}
}

View File

@ -17,13 +17,36 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination.legacy;
package com.metamx.druid.http;
import java.util.Map;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
/**
*/
public interface SizeLookup
public class ComputeMain
{
public Long lookupSize(Map<String, Object> descriptor);
private static final Logger log = new Logger(ComputeMain.class);
public static void main(String[] args) throws Exception
{
LogLevelAdjuster.register();
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(
ComputeNode.builder().build()
);
try {
lifecycle.start();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
System.exit(2);
}
lifecycle.join();
}
}

View File

@ -0,0 +1,236 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.StorageAdapterLoader;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
/**
*/
public class ComputeNode extends BaseServerNode<ComputeNode>
{
private static final Logger log = new Logger(ComputeNode.class);
public static Builder builder()
{
return new Builder();
}
private DruidServer druidServer;
private StorageAdapterLoader adapterLoader;
public ComputeNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public ComputeNode setAdapterLoader(StorageAdapterLoader storageAdapterLoader)
{
Preconditions.checkState(this.adapterLoader == null, "Cannot set adapterLoader once it has already been set.");
this.adapterLoader = storageAdapterLoader;
return this;
}
public ComputeNode setDruidServer(DruidServer druidServer)
{
Preconditions.checkState(this.druidServer == null, "Cannot set druidServer once it has already been set.");
this.druidServer = druidServer;
return this;
}
public DruidServer getDruidServer()
{
initializeDruidServer();
return druidServer;
}
public StorageAdapterLoader getAdapterLoader()
{
initializeAdapterLoader();
return adapterLoader;
}
protected void doInit() throws Exception
{
initializeDruidServer();
initializeAdapterLoader();
final Lifecycle lifecycle = getLifecycle();
final ServiceEmitter emitter = getEmitter();
final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService executorService = ExecutorServices.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(),
getConfigFactory().build(ZkCoordinatorConfig.class),
druidServer,
getPhoneBook(),
serverManager,
emitter
);
lifecycle.addManagedInstance(coordinator);
monitors.add(new ServerMonitor(getDruidServer(), serverManager));
startMonitoring(monitors);
final Context root = new Context(getServer(), "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), serverManager, emitter, getRequestLogger())
),
"/*"
);
}
private void initializeAdapterLoader()
{
if (adapterLoader == null) {
final Properties props = getProps();
try {
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
setAdapterLoader(
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(QueryableLoaderConfig.class))
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
}
}
private void initializeDruidServer()
{
if (druidServer == null) {
setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class)));
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public ComputeNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new ComputeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}

View File

@ -19,10 +19,26 @@
package com.metamx.druid.http;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
@ -37,10 +53,6 @@ import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.client.ServerInventoryManagerConfig;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.coordination.DruidClusterInfoConfig;
import com.metamx.druid.coordination.legacy.S3SizeLookup;
import com.metamx.druid.coordination.legacy.SizeLookup;
import com.metamx.druid.coordination.legacy.TheSizeAdjuster;
import com.metamx.druid.coordination.legacy.TheSizeAdjusterConfig;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector;
@ -54,6 +66,7 @@ import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterConfig;
import com.metamx.druid.master.LoadQueuePeon;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
@ -71,8 +84,6 @@ import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
@ -106,21 +117,14 @@ public class MasterMain
);
final ServiceEmitter emitter = new ServiceEmitter(
props.getProperty("druid.service"),
props.getProperty("druid.host"),
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
);
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
props.getProperty("com.metamx.aws.accessKey"),
props.getProperty("com.metamx.aws.secretKey")
)
);
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
final PhoneBook masterYp = Initialization.createYellowPages(jsonMapper, zkClient, "Master-ZKYP--%s", lifecycle);
final PhoneBook masterYp = Initialization.createPhoneBook(jsonMapper, zkClient, "Master-ZKYP--%s", lifecycle);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final SegmentInventoryManager segmentInventoryManager =
@ -183,14 +187,6 @@ public class MasterMain
jsonMapper,
databaseSegmentManager,
serverInventoryManager,
new TheSizeAdjuster(
configFactory.build(TheSizeAdjusterConfig.class),
jsonMapper,
ImmutableMap.<String, SizeLookup>of(
"s3", new S3SizeLookup(s3Client)
),
zkClient
),
masterYp,
emitter,
scheduledExecutorFactory,

View File

@ -19,195 +19,26 @@
package com.metamx.druid.http;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.Query;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@Deprecated
public class ServerMain
{
private static final Logger log = new Logger(ServerMain.class);
public static void main(String[] args) throws Exception
{
LogLevelAdjuster.register();
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final ObjectMapper smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
final Properties props = Initialization.loadProperties();
final ConfigurationObjectFactory configFactory = Config.createFactory(props);
final Lifecycle lifecycle = new Lifecycle();
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
final ServiceEmitter emitter = new ServiceEmitter(
props.getProperty("druid.service"),
props.getProperty("druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
);
final ExecutorService executorService = ExecutorServices.create(
lifecycle,
configFactory.buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
StupidPool<ByteBuffer> computationBufferPool = ServerInit.makeComputeScratchPool(
Integer.parseInt(props.getProperty("druid.computation.buffer.size", String.valueOf(1024 * 1024 * 1024)))
);
Map<Class<? extends Query>, QueryRunnerFactory> queryRunners = ServerInit.initDefaultQueryTypes(
configFactory,
computationBufferPool
);
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(props.getProperty("com.metamx.aws.accessKey"), props.getProperty("com.metamx.aws.secretKey"))
);
QueryableLoaderConfig queryableLoaderConfig = configFactory.build(QueryableLoaderConfig.class);
final ServerManager serverManager = new ServerManager(
ServerInit.makeDefaultQueryableLoader(s3Client, queryableLoaderConfig),
new DefaultQueryRunnerFactoryConglomerate(queryRunners),
emitter,
executorService
);
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
final DruidServer druidServer = new DruidServer(configFactory.build(DruidServerConfig.class));
final PhoneBook coordinatorYp = Initialization.createYellowPages(
jsonMapper,
zkClient,
"Coordinator-ZKYP--%s",
lifecycle
);
final ZkCoordinator coordinator = new ZkCoordinator(
jsonMapper,
configFactory.build(ZkCoordinatorConfig.class),
druidServer,
coordinatorYp,
serverManager,
emitter
);
lifecycle.addManagedInstance(coordinator);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final List<Monitor> monitors = Lists.<Monitor>newArrayList(
new ServerMonitor(druidServer, serverManager),
new JvmMonitor()
);
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "true"))) {
monitors.add(new SysMonitor());
}
final MonitorScheduler healthMonitor = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
monitors
);
lifecycle.addManagedInstance(healthMonitor);
final RequestLogger requestLogger = Initialization.makeRequestLogger(
scheduledExecutorFactory.create(
1,
"RequestLogger--%d"
),
props
);
lifecycle.addManagedInstance(requestLogger);
try {
lifecycle.start();
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
);
final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(new QueryServlet(jsonMapper, smileMapper, serverManager, emitter, requestLogger)),
"/*"
);
server.start();
server.join();
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("!@(*&$#!(*@(@*&$! You are running with ServerMain!!!! PLZ Stop. Use ComputeMain instead.");
System.out.println("K thx bye.");
ComputeMain.main(args);
}
}

View File

@ -413,9 +413,11 @@ public class IndexMerger
long startTime = System.currentTimeMillis();
File indexFile = new File(outDir, "index.drd");
FileOutputStream fileOutputStream = null;
FileChannel channel = null;
try {
channel = new FileOutputStream(indexFile).getChannel();
fileOutputStream = new FileOutputStream(indexFile);
channel = fileOutputStream.getChannel();
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.CURRENT_VERSION_ID}));
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy).writeToChannel(channel);
@ -435,6 +437,8 @@ public class IndexMerger
finally {
Closeables.closeQuietly(channel);
channel = null;
Closeables.closeQuietly(fileOutputStream);
fileOutputStream = null;
}
IndexIO.checkFileSize(indexFile);
log.info("outDir[%s] completed index.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);

View File

@ -40,7 +40,6 @@ import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.SegmentInventoryManager;
import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.coordination.legacy.TheSizeAdjuster;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -77,7 +76,6 @@ public class DruidMaster
private final DruidClusterInfo clusterInfo;
private final DatabaseSegmentManager databaseSegmentManager;
private final ServerInventoryManager serverInventoryManager;
private final TheSizeAdjuster sizeAdjuster;
private final PhoneBook yp;
private final ServiceEmitter emitter;
private final ScheduledExecutorService exec;
@ -97,7 +95,6 @@ public class DruidMaster
ObjectMapper jsonMapper,
DatabaseSegmentManager databaseSegmentManager,
ServerInventoryManager serverInventoryManager,
TheSizeAdjuster sizeAdjuster,
PhoneBook zkPhoneBook,
ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory,
@ -113,7 +110,6 @@ public class DruidMaster
this.databaseSegmentManager = databaseSegmentManager;
this.serverInventoryManager = serverInventoryManager;
this.sizeAdjuster = sizeAdjuster;
this.yp = zkPhoneBook;
this.emitter = emitter;
@ -354,16 +350,7 @@ public class DruidMaster
for (DataSegment dataSegment : dataSegments) {
if (dataSegment.getSize() < 0) {
log.info("No size on Segment[%s], setting.", dataSegment);
DataSegment newDataSegment = sizeAdjuster.updateDescriptor(dataSegment);
if (dataSegment == null) {
log.warn("newDataSegment was null with old dataSegment[%s]. Skipping.", dataSegment);
continue;
}
dataSegment = newDataSegment;
log.warn("No size on Segment[%s], wtf?", dataSegment);
}
availableSegments.add(dataSegment);
}

View File

@ -25,7 +25,6 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.client.ZKPhoneBook;
import com.metamx.druid.coordination.legacy.TheSizeAdjuster;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.metrics.NoopServiceEmitter;
import com.metamx.phonebook.PhoneBook;
@ -47,7 +46,6 @@ public class DruidMasterTest
private PhoneBook yp;
private DatabaseSegmentManager databaseSegmentManager;
private ServerInventoryManager serverInventoryManager;
private TheSizeAdjuster theSizeAdjuster;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
private DataSegment segment;
@ -69,9 +67,6 @@ public class DruidMasterTest
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
EasyMock.replay(databaseSegmentManager);
theSizeAdjuster = EasyMock.createNiceMock(TheSizeAdjuster.class);
EasyMock.replay(theSizeAdjuster);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
@ -143,7 +138,6 @@ public class DruidMasterTest
null,
databaseSegmentManager,
serverInventoryManager,
theSizeAdjuster,
yp,
new NoopServiceEmitter(),
scheduledExecutorFactory,