1) Initial commit of conversion to using Guice modules for bootstrapping. Things don't actually completely work yet.

This commit is contained in:
cheddar 2013-05-14 11:25:57 -05:00
parent 913cf37060
commit 75a464abe2
53 changed files with 2002 additions and 257 deletions

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
@ -42,9 +41,9 @@ import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
@ -70,8 +69,6 @@ import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -349,14 +346,10 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private void initializeServerInventoryView()
{
if (serverInventoryView == null) {
final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
serverInventoryView = new ServerInventoryView(
getConfigFactory().build(ServerInventoryViewConfig.class),
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
lifecycle.addManagedInstance(serverInventoryView);

View File

@ -24,11 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
@ -38,11 +41,11 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
@ManageLifecycle
public class ServerInventoryView implements ServerView, InventoryView
{
private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
@ -55,11 +58,11 @@ public class ServerInventoryView implements ServerView, InventoryView
private static final Map<String, Integer> removedSegments = new MapMaker().makeMap();
@Inject
public ServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
@ -79,7 +82,7 @@ public class ServerInventoryView implements ServerView, InventoryView
return zkPaths.getServedSegmentsPath();
}
},
exec,
Execs.singleThreaded("ServerInventoryView-%s"),
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>()
{
@Override

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
package com.metamx.druid.curator;
import org.skife.config.Config;
import org.skife.config.Default;

View File

@ -0,0 +1,29 @@
package com.metamx.druid.curator;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.ConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.initialization.Initialization;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
/**
*/
public class CuratorModule implements Module
{
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, CuratorConfig.class);
}
@Provides @LazySingleton
public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException
{
return Initialization.makeCuratorFramework(config, lifecycle);
}
}

View File

@ -1,6 +1,7 @@
package com.metamx.druid.curator.discovery;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
@ -10,18 +11,19 @@ import java.util.Map;
/**
* Uses the Curator Service Discovery recipe to announce services.
*/
public class CuratorServiceAnnouncer<T> implements ServiceAnnouncer
public class CuratorServiceAnnouncer implements ServiceAnnouncer
{
private static final Logger log = new Logger(CuratorServiceAnnouncer.class);
private final ServiceDiscovery<T> discovery;
private final ServiceInstanceFactory<T> instanceFactory;
private final Map<String, ServiceInstance<T>> instanceMap = Maps.newHashMap();
private final ServiceDiscovery<Void> discovery;
private final ServiceInstanceFactory<Void> instanceFactory;
private final Map<String, ServiceInstance<Void>> instanceMap = Maps.newHashMap();
private final Object monitor = new Object();
@Inject
public CuratorServiceAnnouncer(
ServiceDiscovery<T> discovery,
ServiceInstanceFactory<T> instanceFactory
ServiceDiscovery<Void> discovery,
ServiceInstanceFactory<Void> instanceFactory
)
{
this.discovery = discovery;
@ -31,7 +33,7 @@ public class CuratorServiceAnnouncer<T> implements ServiceAnnouncer
@Override
public void announce(String service) throws Exception
{
final ServiceInstance<T> instance;
final ServiceInstance<Void> instance;
synchronized (monitor) {
if (instanceMap.containsKey(service)) {
@ -57,7 +59,7 @@ public class CuratorServiceAnnouncer<T> implements ServiceAnnouncer
@Override
public void unannounce(String service) throws Exception
{
final ServiceInstance<T> instance;
final ServiceInstance<Void> instance;
synchronized (monitor) {
instance = instanceMap.get(service);

View File

@ -0,0 +1,43 @@
package com.metamx.druid.curator.discovery;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
/**
*/
public class DiscoveryModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class);
binder.bind(ServiceAnnouncer.class).to(CuratorServiceAnnouncer.class).in(LazySingleton.class);
}
@Provides @LazySingleton
public ServiceDiscovery<Void> getServiceDiscovery(
CuratorFramework curator,
CuratorDiscoveryConfig config,
Lifecycle lifecycle
) throws Exception
{
return Initialization.makeServiceDiscoveryClient(curator, config, lifecycle);
}
@Provides @LazySingleton
public ServiceInstanceFactory<Void> getServiceInstanceFactory(
DruidNodeConfig nodeConfig
)
{
return Initialization.makeServiceInstanceFactory(nodeConfig);
}
}

View File

@ -42,10 +42,11 @@ import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.QueryToolChestWarehouse;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
@ -227,17 +228,18 @@ public class BrokerNode extends QueryableNode<BrokerNode>
{
if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle();
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
final CuratorDiscoveryConfig curatorDiscoveryConfig = getConfigFactory().build(CuratorDiscoveryConfig.class);
final DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class);
final CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
serviceDiscoveryConfig, lifecycle
getConfigFactory().build(CuratorConfig.class), lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryConfig, lifecycle
final ServiceDiscovery<Void> serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, curatorDiscoveryConfig, lifecycle
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
serviceDiscoveryConfig, serviceDiscovery
nodeConfig, serviceDiscovery
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
}
}

View File

@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.config.Config;
import com.metamx.druid.guice.LazySingleton;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
/**
*/
public class ConfigFactoryModule implements Module
{
@Override
public void configure(Binder binder)
{
}
@Provides @LazySingleton
public ConfigurationObjectFactory makeFactory(Properties props)
{
return Config.createFactory(props);
}
}

View File

@ -19,21 +19,22 @@
package com.metamx.druid.initialization;
import org.skife.config.Config;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public abstract class ServiceDiscoveryConfig extends CuratorConfig
public abstract class CuratorDiscoveryConfig
{
@Config("druid.service")
public abstract String getServiceName();
@JsonProperty
private String path = null;
@Config("druid.host")
public abstract String getHost();
public String getPath()
{
return path;
}
@Config("druid.port")
public abstract int getPort();
@Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath();
public boolean useDiscovery()
{
return path != null;
}
}

View File

@ -0,0 +1,102 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.utils.SocketUtil;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
/**
*/
public abstract class DruidNodeConfig
{
@NotNull
private String serviceName = null;
@NotNull
private String host = null;
@Min(0) @Max(0xffff)
private int port = -1;
@JsonCreator
public DruidNodeConfig(
@JsonProperty("service") String serviceName,
@JsonProperty("host") String host,
@JsonProperty("port") Integer port
)
{
this.serviceName = serviceName;
if (port == null) {
if (host == null) {
setHostAndPort(null, -1);
}
else if (host.contains(":")) {
try {
setHostAndPort(host, Integer.parseInt(host.split(":")[1]));
}
catch (Exception e) {
setHostAndPort(host, -1);
}
}
else {
final int openPort = SocketUtil.findOpenPort(8080);
setHostAndPort(String.format("%s:%d", host, openPort), openPort);
}
}
else {
if (host == null || host.contains(":")) {
setHostAndPort(host, port);
}
else {
setHostAndPort(String.format("%s:%d", host, port), port);
}
}
}
private void setHostAndPort(String host, int port)
{
this.host = host;
this.port = port;
}
@JsonProperty("service")
public String getServiceName()
{
return serviceName;
}
@JsonProperty
public String getHost()
{
return host;
}
@JsonProperty
public int getPort()
{
return port;
}
}

View File

@ -0,0 +1,128 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Binding;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.guice.DruidScopes;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.service.ServiceEmitter;
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.Properties;
/**
*/
public class EmitterModule implements Module
{
private static final Logger log = new Logger(EmitterModule.class);
private final Properties props;
@Inject
public EmitterModule(
Properties props
)
{
this.props = props;
}
@Override
public void configure(Binder binder)
{
String emitterType = props.getProperty("druid.emitter", "");
binder.install(new LogEmitterModule());
binder.install(new HttpEmitterModule());
binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(DruidScopes.SINGLETON);
}
@Provides
@LazySingleton
public ServiceEmitter getServiceEmitter(DruidNodeConfig config, Emitter emitter)
{
final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter);
EmittingLogger.registerEmitter(retVal);
return retVal;
}
private static class EmitterProvider implements Provider<Emitter>
{
private final String emitterType;
private Emitter emitter = null;
EmitterProvider(
String emitterType
)
{
this.emitterType = emitterType;
}
@Inject
public void inject(Injector injector)
{
final List<Binding<Emitter>> emitterBindings = injector.findBindingsByType(new TypeLiteral<Emitter>(){});
for (Binding<Emitter> binding : emitterBindings) {
if (Names.named(emitterType).equals(binding.getKey().getAnnotation())) {
emitter = binding.getProvider().get();
break;
}
}
if (emitter == null) {
List<String> knownTypes = Lists.newArrayList();
for (Binding<Emitter> binding : emitterBindings) {
final Annotation annotation = binding.getKey().getAnnotation();
if (annotation != null) {
knownTypes.add(((Named) annotation).value());
}
}
throw new ISE("Uknown emitter type, known types[%s]", knownTypes);
}
}
@Override
public Emitter get()
{
if (emitter == null) {
throw new ISE("Emitter was null, that's bad!");
}
return emitter;
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
/**
*/
public abstract class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig
{
@JsonProperty
private Duration timeOut = new Duration("PT5m");
public Duration getReadTimeout()
{
return timeOut;
}
}

View File

@ -0,0 +1,62 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.HttpPostEmitter;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
/**
*/
public class HttpEmitterModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.http", HttpEmitterConfig.class);
}
@Provides @LazySingleton @Named("http")
public Emitter getEmitter(Supplier<HttpEmitterConfig> config, @Nullable SSLContext sslContext, Lifecycle lifecycle)
{
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(1)
.withReadTimeout(config.get().getReadTimeout());
if (sslContext != null) {
builder.withSslContext(sslContext);
}
return new HttpPostEmitter(config.get(), HttpClientInit.createClient(builder.build(), lifecycle));
}
}

View File

@ -27,6 +27,7 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
@ -42,7 +43,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.zookeeper.data.Stat;
import org.mortbay.jetty.Connector;
@ -221,16 +221,16 @@ public class Initialization
return framework;
}
public static ServiceDiscovery makeServiceDiscoveryClient(
public static ServiceDiscovery<Void> makeServiceDiscoveryClient(
CuratorFramework discoveryClient,
ServiceDiscoveryConfig config,
CuratorDiscoveryConfig config,
Lifecycle lifecycle
)
throws Exception
{
final ServiceDiscovery serviceDiscovery =
final ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath(config.getDiscoveryPath())
.basePath(config.getPath())
.client(discoveryClient)
.build();
@ -260,21 +260,21 @@ public class Initialization
}
public static ServiceAnnouncer makeServiceAnnouncer(
ServiceDiscoveryConfig config,
ServiceDiscovery serviceDiscovery
DruidNodeConfig config,
ServiceDiscovery<Void> serviceDiscovery
)
{
final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config);
final ServiceInstanceFactory<Void> serviceInstanceFactory = makeServiceInstanceFactory(config);
return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory);
}
public static void announceDefaultService(
final ServiceDiscoveryConfig config,
final DruidNodeConfig nodeConfig,
final ServiceAnnouncer serviceAnnouncer,
final Lifecycle lifecycle
) throws Exception
{
final String service = config.getServiceName().replace('/', ':');
final String service = nodeConfig.getServiceName().replace('/', ':');
lifecycle.addHandler(
new Lifecycle.Handler()
@ -357,7 +357,7 @@ public class Initialization
);
}
public static ServiceInstanceFactory<Void> makeServiceInstanceFactory(ServiceDiscoveryConfig config)
public static ServiceInstanceFactory<Void> makeServiceInstanceFactory(DruidNodeConfig config)
{
final String host = config.getHost();
final String address;

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.metamx.druid.guice.JsonConfigProvider;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.core.LoggingEmitterConfig;
/**
*/
public class LogEmitterModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.logging", LoggingEmitterConfig.class);
}
@Provides @LazySingleton @Named("logging")
public Emitter makeEmitter(LoggingEmitterConfig config, ObjectMapper jsonMapper)
{
return new LoggingEmitter(config, jsonMapper);
}
}

View File

@ -0,0 +1,135 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.skife.config.ConfigurationObjectFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
/**
*/
public class PropertiesModule implements Module
{
private static final Logger log = new Logger(PropertiesModule.class);
private final String propertiesFile;
public PropertiesModule(String propertiesFile)
{
this.propertiesFile = propertiesFile;
}
@Override
public void configure(Binder binder)
{
final Properties zkProps = new Properties();
final Properties fileProps = new Properties(zkProps);
// Note that zookeeper coordinates must be either in cmdLine or in runtime.properties
Properties sp = System.getProperties();
Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain
tmp_props.putAll(sp);
final InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
if (stream == null) {
log.info("%s not found on classpath, relying only on system properties and zookeeper.", propertiesFile);
} else {
log.info("Loading properties from %s", propertiesFile);
try {
try {
fileProps.load(stream);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
finally {
Closeables.closeQuietly(stream);
}
}
// log properties from file; stringPropertyNames() would normally cascade down into the sub Properties objects, but
// zkProps (the parent level) is empty at this point so it will only log properties from runtime.properties
for (String prop : fileProps.stringPropertyNames()) {
log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop));
}
final String zkHostsProperty = "druid.zk.service.host";
if (tmp_props.getProperty(zkHostsProperty) != null) {
final ConfigurationObjectFactory factory = Config.createFactory(tmp_props);
ZkPathsConfig config;
try {
config = factory.build(ZkPathsConfig.class);
}
catch (IllegalArgumentException e) {
log.warn(e, "Unable to build ZkPathsConfig. Cannot load properties from ZK.");
config = null;
}
if (config != null) {
Lifecycle lifecycle = new Lifecycle();
try {
CuratorFramework curator = Initialization.makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle);
lifecycle.start();
final Stat stat = curator.checkExists().forPath(config.getPropertiesPath());
if (stat != null) {
final byte[] data = curator.getData().forPath(config.getPropertiesPath());
zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
}
// log properties from zk
for (String prop : zkProps.stringPropertyNames()) {
log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycle.stop();
}
}
} else {
log.warn("property[%s] not set, skipping ZK-specified properties.", zkHostsProperty);
}
binder.bind(Properties.class).toInstance(tmp_props);
}
}

View File

@ -63,6 +63,10 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
@ -87,6 +91,14 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>

View File

@ -23,6 +23,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
/**
*/
@ -31,7 +33,19 @@ public class Execs
public static ExecutorService singleThreaded(String nameFormat)
{
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()
makeThreadFactory(nameFormat)
);
}
public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat)
{
return Executors.newSingleThreadScheduledExecutor(
makeThreadFactory(nameFormat)
);
}
public static ThreadFactory makeThreadFactory(String nameFormat)
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.db;
import com.google.common.base.Supplier;
import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
@ -159,12 +160,14 @@ public class DbConnector
}
}
private final DbConnectorConfig config;
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;
public DbConnector(DbConnectorConfig config)
public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables)
{
this.config = config;
this.dbTables = dbTables;
this.dbi = new DBI(getDatasource());
}
@ -176,16 +179,28 @@ public class DbConnector
private DataSource getDatasource()
{
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(config.getDatabaseUser());
dataSource.setPassword(config.getDatabasePassword());
dataSource.setUrl(config.getDatabaseConnectURI());
DbConnectorConfig connectorConfig = config.get();
if (config.isValidationQuery()) {
dataSource.setValidationQuery(config.getValidationQuery());
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
dataSource.setUrl(connectorConfig.getConnectURI());
if (connectorConfig.isUseValidationQuery()) {
dataSource.setValidationQuery(connectorConfig.getValidationQuery());
dataSource.setTestOnBorrow(true);
}
return dataSource;
}
public void createSegmentTable()
{
createSegmentTable(dbi, dbTables.get().getSegmentsTable());
}
public void createRulesTable()
{
createRuleTable(dbi, dbTables.get().getRulesTable());
}
}

View File

@ -20,37 +20,60 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.skife.config.Config;
import javax.validation.constraints.NotNull;
/**
*/
public abstract class DbConnectorConfig
{
@JsonProperty("connectURI")
@Config("druid.database.connectURI")
public abstract String getDatabaseConnectURI();
@JsonProperty
private boolean createTables = true;
@JsonProperty("user")
@Config("druid.database.user")
public abstract String getDatabaseUser();
@JsonProperty
@NotNull
private String connectURI = null;
@JsonProperty("password")
@Config("druid.database.password")
public abstract String getDatabasePassword();
@JsonProperty
@NotNull
private String user = null;
@JsonProperty("segmentTable")
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();
@JsonProperty
@NotNull
private String password = null;
@JsonProperty("useValidationQuery")
@Config("druid.database.validation")
public boolean isValidationQuery() {
return false;
@JsonProperty
private boolean useValidationQuery = false;
@JsonProperty
private String validationQuery = "SELECT 1";
public boolean isCreateTables()
{
return createTables;
}
public String getConnectURI()
{
return connectURI;
}
public String getUser()
{
return user;
}
public String getPassword()
{
return password;
}
public boolean isUseValidationQuery()
{
return useValidationQuery;
}
@JsonProperty("validationQuery")
@Config("druid.database.validationQuery")
public String getValidationQuery() {
return "SELECT 1";
return validationQuery;
}
}

View File

@ -0,0 +1,69 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.ISE;
import javax.validation.constraints.NotNull;
/**
*/
public class DbTablesConfig
{
public static DbTablesConfig fromBase(String base)
{
return new DbTablesConfig(base, null, null);
}
@NotNull
private final String base;
@NotNull
private final String segmentsTable;
@NotNull
private final String ruleTable;
@JsonCreator
public DbTablesConfig(
@JsonProperty("base") String base,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable
)
{
this.base = base;
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.ruleTable = makeTableName(rulesTable, "rules");
}
private String makeTableName(String explicitTableName, String defaultSuffix)
{
if (explicitTableName == null) {
if (base == null) {
throw new ISE("table[%s] unknown! Both base and %s were null!", defaultSuffix, defaultSuffix);
}
return String.format("%s_%s", base, defaultSuffix);
}
return explicitTableName;
}
@JsonProperty
public String getBase()
{
return base;
}
@JsonProperty("segments")
public String getSegmentsTable()
{
return segmentsTable;
}
@JsonProperty("rules")
public String getRulesTable()
{
return ruleTable;
}
}

View File

@ -0,0 +1,45 @@
package com.metamx.druid.guice;
import com.google.common.base.Preconditions;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.skife.config.ConfigurationObjectFactory;
/**
*/
public class ConfigProvider<T> implements Provider<T>
{
public static <T> void bind(Binder binder, Class<T> clazz)
{
binder.bind(clazz).toProvider(of(clazz)).in(DruidScopes.SINGLETON);
}
public static <T> Provider<T> of(Class<T> clazz)
{
return new ConfigProvider<T>(clazz);
}
private final Class<T> clazz;
private T object = null;
public ConfigProvider(
Class<T> clazz
)
{
this.clazz = clazz;
}
@Inject
public void inject(ConfigurationObjectFactory factory)
{
object = factory.build(clazz);
}
@Override
public T get()
{
return Preconditions.checkNotNull(object, "WTF!? Code misconfigured, inject() didn't get called.");
}
}

View File

@ -0,0 +1,15 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
/**
*/
public class DruidGuiceExtensions implements Module
{
@Override
public void configure(Binder binder)
{
binder.bindScope(LazySingleton.class, DruidScopes.SINGLETON);
}
}

View File

@ -0,0 +1,71 @@
package com.metamx.druid.guice;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Scope;
import com.google.inject.Scopes;
import com.metamx.common.lifecycle.Lifecycle;
/**
*/
public class DruidScopes
{
public static final Scope SINGLETON = new Scope()
{
@Override
public <T> Provider<T> scope(Key<T> key, Provider<T> unscoped)
{
return Scopes.SINGLETON.scope(key, unscoped);
}
@Override
public String toString()
{
return "DruidScopes.SINGLETON";
}
};
public static final Scope LIFECYCLE = new Scope()
{
@Override
public <T> Provider<T> scope(final Key<T> key, final Provider<T> unscoped)
{
return new Provider<T>()
{
private Provider<T> provider;
@Inject
public void inject(final Lifecycle lifecycle)
{
provider = Scopes.SINGLETON.scope(
key,
new Provider<T>()
{
@Override
public T get()
{
return lifecycle.addManagedInstance(unscoped.get());
}
}
);
}
@Override
public T get()
{
System.out.println(provider);
return provider.get();
}
};
}
@Override
public String toString()
{
return "DruidScopes.LIFECYCLE";
}
};
}

View File

@ -0,0 +1,45 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.metamx.druid.jackson.Json;
import com.metamx.druid.jackson.Smile;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
/**
*/
public class DruidSecondaryModule implements Module
{
private final Properties properties;
private final ConfigurationObjectFactory factory;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
@Inject
public DruidSecondaryModule(
Properties properties,
ConfigurationObjectFactory factory,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper
)
{
this.properties = properties;
this.factory = factory;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
}
@Override
public void configure(Binder binder)
{
binder.install(new DruidGuiceExtensions());
binder.bind(Properties.class).toInstance(properties);
binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMapper);
binder.bind(ObjectMapper.class).annotatedWith(Smile.class).toInstance(smileMapper);
}
}

View File

@ -0,0 +1,121 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
*/
public class JsonConfigProvider<T> implements Provider<Supplier<T>>
{
private static final Joiner JOINER = Joiner.on(", ");
public static <T> void bind(Binder binder, String propertyBase, Class<T> classToProvide)
{
binder.bind(new TypeLiteral<Supplier<T>>(){}).toProvider(of(propertyBase, classToProvide)).in(DruidScopes.SINGLETON);
}
public static <T> JsonConfigProvider<T> of(String propertyBase, Class<T> classToProvide)
{
return new JsonConfigProvider<T>(propertyBase, classToProvide);
}
private final String propertyBase;
private final Class<T> classToProvide;
private Supplier<T> supplier;
public JsonConfigProvider(
String propertyBase,
Class<T> classToProvide
)
{
this.propertyBase = propertyBase;
this.classToProvide = classToProvide;
}
@Inject
public void inject(
Properties props,
ObjectMapper jsonMapper,
Validator validator
)
{
Map<String, Object> jsonMap = Maps.newHashMap();
for (String prop : props.stringPropertyNames()) {
if (prop.startsWith(propertyBase)) {
final String propValue = props.getProperty(prop);
try {
jsonMap.put(prop.substring(propertyBase.length()), jsonMapper.readValue(propValue, Object.class));
}
catch (IOException e) {
throw new IAE("Unable to parse an object out of prop[%s]=[%s]", prop, propValue);
}
}
}
final T config = jsonMapper.convertValue(jsonMap, classToProvide);
final Set<ConstraintViolation<T>> violations = validator.validate(config);
if (!violations.isEmpty()) {
List<String> messages = Lists.newArrayList();
for (ConstraintViolation<T> violation : violations) {
messages.add(String.format("%s - %s", violation.getPropertyPath().toString(), violation.getMessage()));
}
throw new ISE("Configuration violations[%s]", JOINER.join(messages));
}
this.supplier = new Supplier<T>()
{
@Override
public T get()
{
return config;
}
};
}
@Override
public Supplier<T> get()
{
return supplier;
}
}

View File

@ -0,0 +1,18 @@
package com.metamx.druid.guice;
import com.google.inject.ScopeAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RUNTIME)
@ScopeAnnotation
public @interface LazySingleton
{
}

View File

@ -0,0 +1,74 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
/**
* A Module to add lifecycle management to the injector. {@link DruidGuiceExtensions} must also be included.
*/
public class LifecycleModule implements Module
{
private final LifecycleScope scope = new LifecycleScope();
private final Key<?>[] eagerClasses;
/**
* A constructor that takes a list of classes to instantiate eagerly. Class {@link Key}s mentioned here will
* be pulled out of the injector with an injector.getInstance() call when the lifecycle is created.
*
* Eagerly loaded classes will *not* be automatically added to the Lifecycle unless they are bound to the proper
* scope.
*
* This mechanism exists to allow the {@link Lifecycle} to be the primary entry point from the injector, not to
* auto-register things with the {@link Lifecycle}
*
* @param eagerClasses set of classes to instantiate eagerly
*/
public LifecycleModule(
Key<?>... eagerClasses
)
{
this.eagerClasses = eagerClasses;
}
@Override
public void configure(Binder binder)
{
binder.bindScope(ManageLifecycle.class, scope);
}
@Provides @LazySingleton
public Lifecycle getLifecycle(Injector injector)
{
Lifecycle lifecycle = new Lifecycle();
scope.setLifecycle(lifecycle);
for (Key<?> key : eagerClasses) {
injector.getInstance(key); // Pull the key so as to "eagerly" load up the class.
}
return lifecycle;
}
}

View File

@ -0,0 +1,67 @@
package com.metamx.druid.guice;
import com.google.common.collect.Lists;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Scope;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import java.util.List;
/**
* A scope that adds objects to the Lifecycle. This is by definition also a lazy singleton scope.
*/
public class LifecycleScope implements Scope
{
private static final Logger log = new Logger(LifecycleScope.class);
private Lifecycle lifecycle;
private List<Object> instances = Lists.newLinkedList();
public void setLifecycle(Lifecycle lifecycle)
{
this.lifecycle = lifecycle;
synchronized (instances) {
for (Object instance : instances) {
lifecycle.addManagedInstance(instance);
}
}
}
@Override
public <T> Provider<T> scope(final Key<T> key, final Provider<T> unscoped)
{
return new Provider<T>()
{
private T value = null;
@Override
public synchronized T get()
{
if (value == null) {
final T retVal = unscoped.get();
synchronized (instances) {
if (lifecycle == null) {
instances.add(retVal);
}
else {
try {
lifecycle.addMaybeStartManagedInstance(retVal);
}
catch (Exception e) {
log.warn(e, "Caught exception when trying to create a[%s]", key);
return null;
}
}
}
value = retVal;
}
return value;
}
};
}
}

View File

@ -0,0 +1,19 @@
package com.metamx.druid.guice;
import com.google.inject.ScopeAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Marks the object to be managed by {@link com.metamx.common.lifecycle.Lifecycle}
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RUNTIME)
@ScopeAnnotation
public @interface ManageLifecycle
{
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.jackson;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.druid.guice.LazySingleton;
/**
*/
public class JacksonModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class));
}
@Provides @LazySingleton @Json
public ObjectMapper jsonMapper()
{
return new DefaultObjectMapper();
}
@Provides @LazySingleton @Smile
public ObjectMapper smileMapper()
{
ObjectMapper retVal = new DefaultObjectMapper(new SmileFactory());
retVal.getJsonFactory().setCodec(retVal);
return retVal;
}
}

View File

@ -0,0 +1,17 @@
package com.metamx.druid.jackson;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Json
{
}

View File

@ -0,0 +1,17 @@
package com.metamx.druid.jackson;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Smile
{
}

View File

@ -0,0 +1,39 @@
package com.metamx.druid.utils;
import com.metamx.common.ISE;
import java.io.IOException;
import java.net.ServerSocket;
/**
*/
public class SocketUtil
{
public static int findOpenPort(int startPort)
{
int currPort = startPort;
while (currPort < 0xffff) {
ServerSocket socket = null;
try {
socket = new ServerSocket(currPort);
return currPort;
}
catch (IOException e) {
++currPort;
}
finally {
if (socket != null) {
try {
socket.close();
}
catch (IOException e) {
}
}
}
}
throw new ISE("Unable to find open port between[%d] and [%d]", startPort, currPort);
}
}

View File

@ -0,0 +1,231 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import junit.framework.Assert;
import org.junit.Test;
/**
*/
public class LifecycleScopeTest
{
@Test
public void testAnnotation() throws Exception
{
final Injector injector = Guice.createInjector(
new DruidGuiceExtensions(),
new LifecycleModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TestInterface.class).to(AnnotatedClass.class);
}
}
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final TestInterface instance = injector.getInstance(TestInterface.class);
testIt(injector, lifecycle, instance);
}
@Test
public void testExplicit() throws Exception
{
final Injector injector = Guice.createInjector(
new DruidGuiceExtensions(),
new LifecycleModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TestInterface.class).to(ExplicitClass.class).in(ManageLifecycle.class);
}
}
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final TestInterface instance = injector.getInstance(TestInterface.class);
testIt(injector, lifecycle, instance);
}
private void testIt(Injector injector, Lifecycle lifecycle, TestInterface instance)
throws Exception
{
Assert.assertEquals(0, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(0, instance.getRan());
instance.run();
Assert.assertEquals(0, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(1, instance.getRan());
lifecycle.start();
Assert.assertEquals(1, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(1, instance.getRan());
injector.getInstance(TestInterface.class).run(); // It's a singleton
Assert.assertEquals(1, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(2, instance.getRan());
lifecycle.stop();
Assert.assertEquals(1, instance.getStarted());
Assert.assertEquals(1, instance.getStopped());
Assert.assertEquals(2, instance.getRan());
}
/**
* This is a test for documentation purposes. It's there to show what weird things Guice will do when
* it sees both the annotation and an explicit binding.
*
* @throws Exception
*/
@Test
public void testAnnotatedAndExplicit() throws Exception
{
final Injector injector = Guice.createInjector(
new DruidGuiceExtensions(),
new LifecycleModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TestInterface.class).to(AnnotatedClass.class).in(ManageLifecycle.class);
}
}
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final TestInterface instance = injector.getInstance(TestInterface.class);
Assert.assertEquals(0, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(0, instance.getRan());
instance.run();
Assert.assertEquals(0, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(1, instance.getRan());
lifecycle.start();
Assert.assertEquals(2, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(1, instance.getRan());
injector.getInstance(TestInterface.class).run(); // It's a singleton
Assert.assertEquals(2, instance.getStarted());
Assert.assertEquals(0, instance.getStopped());
Assert.assertEquals(2, instance.getRan());
lifecycle.stop();
Assert.assertEquals(2, instance.getStarted());
Assert.assertEquals(2, instance.getStopped());
Assert.assertEquals(2, instance.getRan());
}
private static interface TestInterface
{
public void run();
public int getStarted();
public int getStopped();
public int getRan();
}
@ManageLifecycle
public static class AnnotatedClass implements TestInterface
{
int started = 0;
int stopped = 0;
int ran = 0;
@LifecycleStart
public void start()
{
++started;
}
@LifecycleStop
public void stop()
{
++stopped;
}
@Override
public void run()
{
++ran;
}
public int getStarted()
{
return started;
}
public int getStopped()
{
return stopped;
}
public int getRan()
{
return ran;
}
}
public static class ExplicitClass implements TestInterface
{
int started = 0;
int stopped = 0;
int ran = 0;
@LifecycleStart
public void start()
{
++started;
}
@LifecycleStop
public void stop()
{
++stopped;
}
@Override
public void run()
{
++ran;
}
public int getStarted()
{
return started;
}
public int getStopped()
{
return stopped;
}
public int getRan()
{
return ran;
}
}
}

View File

@ -25,8 +25,8 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task;
@ -65,7 +65,7 @@ public class TaskMasterLifecycle
final TaskQueue taskQueue,
final TaskActionClientFactory taskActionClientFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig,
final ServiceDiscoveryConfig serviceDiscoveryConfig,
final DruidNodeConfig nodeConfig,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
@ -103,7 +103,7 @@ public class TaskMasterLifecycle
final Lifecycle leaderLifecycle = new Lifecycle();
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(taskRunner);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@ -55,9 +56,10 @@ import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
@ -156,7 +158,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private DBI dbi = null;
private IndexerCoordinatorConfig config = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceDiscovery<Void> serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private TaskStorage taskStorage = null;
private TaskQueue taskQueue = null;
@ -363,12 +365,12 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private void initializeTaskMasterLifecycle()
{
if (taskMasterLifecycle == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
final DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class);
taskMasterLifecycle = new TaskMasterLifecycle(
taskQueue,
taskActionClientFactory,
config,
serviceDiscoveryConfig,
nodeConfig,
taskRunnerFactory,
resourceManagementSchedulerFactory,
getCuratorFramework(),
@ -532,7 +534,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
dbConnectorConfig = getConfigFactory().build(DbConnectorConfig.class);
}
if (dbi == null) {
dbi = new DbConnector(dbConnectorConfig).getDBI();
dbi = new DbConnector(Suppliers.ofInstance(dbConnectorConfig), null).getDBI(); // TODO
}
}
@ -566,14 +568,15 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
public void initializeServiceDiscovery() throws Exception
{
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
final CuratorDiscoveryConfig config = getConfigFactory().build(CuratorDiscoveryConfig.class);
if (serviceDiscovery == null) {
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, getLifecycle()
);
}
if (serviceAnnouncer == null) {
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
DruidNodeConfig nodeConfig = getConfigFactory().build(DruidNodeConfig.class);
final ServiceInstanceFactory<Void> instanceFactory = Initialization.makeServiceInstanceFactory(nodeConfig);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
}
}
@ -614,7 +617,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
taskStorage = new DbTaskStorage(
getJsonMapper(),
dbConnectorConfig,
new DbConnector(dbConnectorConfig).getDBI()
new DbConnector(Suppliers.<DbConnectorConfig>ofInstance(dbConnectorConfig), null).getDBI() // TODO
);
} else {
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());

View File

@ -43,10 +43,11 @@ import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
@ -56,8 +57,8 @@ import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
@ -114,7 +115,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private WorkerConfig workerConfig = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceDiscovery<Void> serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null;
private Server server = null;
@ -391,14 +392,15 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void initializeServiceDiscovery() throws Exception
{
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
final CuratorDiscoveryConfig config = configFactory.build(CuratorDiscoveryConfig.class);
if (serviceDiscovery == null) {
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, lifecycle
);
}
if (serviceAnnouncer == null) {
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class);
final ServiceInstanceFactory<Void> instanceFactory = Initialization.makeServiceInstanceFactory(nodeConfig);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
}
if (coordinatorServiceProvider == null) {

View File

@ -34,14 +34,12 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig;
@ -332,7 +330,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
public void initializeServiceDiscovery() throws Exception
{
if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
final CuratorDiscoveryConfig config = getConfigFactory().build(CuratorDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(),
config,

View File

@ -38,7 +38,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version>
<metamx.java-util.version>0.23.0-SNAPSHOT</metamx.java-util.version>
<netflix.curator.version>2.0.1-21-22</netflix.curator.version>
</properties>
@ -219,6 +219,11 @@
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.11</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>5.0.1.Final</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>

View File

@ -21,17 +21,20 @@ package com.metamx.druid.db;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.master.rules.PeriodLoadRule;
import com.metamx.druid.master.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -52,12 +55,13 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class DatabaseRuleManager
{
public static void createDefaultRule(
final DBI dbi,
final String ruleTable,
final String defaultDatasource,
final String defaultTier,
final ObjectMapper jsonMapper
)
{
@ -72,7 +76,7 @@ public class DatabaseRuleManager
String.format(
"SELECT id from %s where datasource='%s';",
ruleTable,
defaultDatasource
defaultTier
)
);
@ -94,8 +98,8 @@ public class DatabaseRuleManager
ruleTable
)
)
.bind("id", String.format("%s_%s", defaultDatasource, version))
.bind("dataSource", defaultDatasource)
.bind("id", String.format("%s_%s", defaultTier, version))
.bind("dataSource", defaultTier)
.bind("version", version)
.bind("payload", jsonMapper.writeValueAsString(defaultRules))
.execute();
@ -115,6 +119,7 @@ public class DatabaseRuleManager
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final DatabaseRuleManagerConfig config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
@ -122,18 +127,21 @@ public class DatabaseRuleManager
private volatile boolean started = false;
@Inject
public DatabaseRuleManager(
ObjectMapper jsonMapper,
ScheduledExecutorService exec,
DatabaseRuleManagerConfig config,
Supplier<DbTablesConfig> dbTables,
DBI dbi
)
{
this.jsonMapper = jsonMapper;
this.exec = exec;
this.config = config;
this.dbTables = dbTables;
this.dbi = dbi;
this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
this.rules = new AtomicReference<ConcurrentHashMap<String, List<Rule>>>(
new ConcurrentHashMap<String, List<Rule>>()
);
@ -147,6 +155,7 @@ public class DatabaseRuleManager
return;
}
createDefaultRule(dbi, getRulesTable(), config.getDefaultTier(), jsonMapper);
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
@ -192,8 +201,11 @@ public class DatabaseRuleManager
return handle.createQuery(
// Return latest version rule by dataSource
String.format(
"SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version",
config.getRuleTable()
"SELECT r.dataSource, r.payload "
+ "FROM %1$s r "
+ "INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds "
+ "ON r.datasource = ds.datasource and r.version = ds.version",
getRulesTable()
)
).fold(
Maps.<String, List<Rule>>newHashMap(),
@ -255,8 +267,8 @@ public class DatabaseRuleManager
if (theRules.get(dataSource) != null) {
retVal.addAll(theRules.get(dataSource));
}
if (theRules.get(config.getDefaultDatasource()) != null) {
retVal.addAll(theRules.get(config.getDefaultDatasource()));
if (theRules.get(config.getDefaultTier()) != null) {
retVal.addAll(theRules.get(config.getDefaultTier()));
}
return retVal;
}
@ -275,7 +287,7 @@ public class DatabaseRuleManager
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, version, payload) VALUES (:id, :dataSource, :version, :payload)",
config.getRuleTable()
getRulesTable()
)
)
.bind("id", String.format("%s_%s", dataSource, version))
@ -303,4 +315,6 @@ public class DatabaseRuleManager
return true;
}
private String getRulesTable() {return dbTables.get().getRulesTable();}
}

View File

@ -27,12 +27,9 @@ import org.skife.config.Default;
*/
public abstract class DatabaseRuleManagerConfig
{
@Config("druid.database.ruleTable")
public abstract String getRuleTable();
@Config("druid.database.defaultDatasource")
@Config("druid.database.rules.defaultTier")
@Default("_default")
public abstract String getDefaultDatasource();
public abstract String getDefaultTier();
@Config("druid.database.rules.poll.duration")
@Default("PT1M")

View File

@ -21,9 +21,11 @@ package com.metamx.druid.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
@ -33,7 +35,8 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.ManageLifecycle;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -57,6 +60,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class DatabaseSegmentManager
{
private static final Logger log = new Logger(DatabaseSegmentManager.class);
@ -65,26 +69,30 @@ public class DatabaseSegmentManager
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final DatabaseSegmentManagerConfig config;
private final Supplier<DatabaseSegmentManagerConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final DBI dbi;
private volatile boolean started = false;
@Inject
public DatabaseSegmentManager(
ObjectMapper jsonMapper,
ScheduledExecutorService exec,
DatabaseSegmentManagerConfig config,
Supplier<DatabaseSegmentManagerConfig> config,
Supplier<DbTablesConfig> dbTables,
DBI dbi
)
{
this.jsonMapper = jsonMapper;
this.exec = exec;
this.config = config;
this.dbTables = dbTables;
this.dataSources = new AtomicReference<ConcurrentHashMap<String, DruidDataSource>>(
new ConcurrentHashMap<String, DruidDataSource>()
);
this.dbi = dbi;
this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
}
@LifecycleStart
@ -98,7 +106,7 @@ public class DatabaseSegmentManager
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.getPollDuration(),
config.get().getPollDuration(),
new Runnable()
{
@Override
@ -136,7 +144,7 @@ public class DatabaseSegmentManager
public VersionedIntervalTimeline<String, DataSegment> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", config.getSegmentTable())
String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", getSegmentsTable())
)
.bind("dataSource", ds)
.fold(
@ -204,7 +212,7 @@ public class DatabaseSegmentManager
batch.add(
String.format(
"UPDATE %s SET used=1 WHERE id = '%s'",
config.getSegmentTable(),
getSegmentsTable(),
segment.getIdentifier()
)
);
@ -234,7 +242,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("UPDATE %s SET used=1 WHERE id = :id", config.getSegmentTable())
String.format("UPDATE %s SET used=1 WHERE id = :id", getSegmentsTable())
)
.bind("id", segmentId)
.execute();
@ -268,7 +276,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", config.getSegmentTable())
String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", getSegmentsTable())
)
.bind("dataSource", ds)
.execute();
@ -298,7 +306,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE id = :segmentID", config.getSegmentTable())
String.format("UPDATE %s SET used=0 WHERE id = :segmentID", getSegmentsTable())
).bind("segmentID", segmentID)
.execute();
@ -354,7 +362,7 @@ public class DatabaseSegmentManager
public List<String> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT DISTINCT(datasource) FROM %s", config.getSegmentTable())
String.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
)
.fold(
Lists.<String>newArrayList(),
@ -398,7 +406,7 @@ public class DatabaseSegmentManager
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT payload FROM %s WHERE used=1", config.getSegmentTable())
String.format("SELECT payload FROM %s WHERE used=1", getSegmentsTable())
).list();
}
}
@ -451,4 +459,8 @@ public class DatabaseSegmentManager
log.error(e, "Problem polling DB.");
}
}
private String getSegmentsTable() {
return dbTables.get().getSegmentsTable();
}
}

View File

@ -19,18 +19,18 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class DatabaseSegmentManagerConfig
public class DatabaseSegmentManagerConfig
{
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();
@JsonProperty
private Duration pollDuration = new Duration("PT1M");
@Config("druid.database.poll.duration")
@Default("PT1M")
public abstract Duration getPollDuration();
public Duration getPollDuration()
{
return pollDuration;
}
}

View File

@ -0,0 +1,54 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.skife.jdbi.v2.DBI;
/**
*/
public class MasterModule implements Module
{
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, ZkPathsConfig.class);
ConfigProvider.bind(binder, ServerInventoryViewConfig.class);
ConfigProvider.bind(binder, DbConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.database.tables", DbTablesConfig.class);
}
@Provides @LazySingleton
public DBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle)
{
if (config.isCreateTables()) {
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
dbConnector.createSegmentTable();
dbConnector.createRulesTable();
}
@Override
public void stop()
{
}
}
);
}
return dbConnector.getDBI();
}
}

View File

@ -0,0 +1,16 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.druid.initialization.DruidNodeConfig;
/**
*/
public class ServerModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid", DruidNodeConfig.class);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
@ -38,7 +39,6 @@ import com.metamx.druid.master.rules.Rule;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;

View File

@ -21,57 +21,56 @@ package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.collect.Iterables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.curator.discovery.DiscoveryModule;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.guice.DruidGuiceExtensions;
import com.metamx.druid.guice.DruidSecondaryModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.MasterModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.initialization.ConfigFactoryModule;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.PropertiesModule;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.jackson.JacksonModule;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterConfig;
import com.metamx.druid.master.LoadQueueTaskMaster;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.druid.metrics.MetricsModule;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider;
import org.joda.time.Duration;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
@ -80,11 +79,9 @@ import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import javax.annotation.Nullable;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.Arrays;
/**
*/
@ -96,89 +93,41 @@ public class MasterMain
{
LogLevelAdjuster.register();
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final Properties props = Initialization.loadProperties();
final ConfigurationObjectFactory configFactory = Config.createFactory(props);
final Lifecycle lifecycle = new Lifecycle();
final HttpClientConfig.Builder httpClientConfigBuilder = HttpClientConfig.builder().withNumConnections(1);
final String emitterTimeout = props.getProperty("druid.emitter.timeOut");
if (emitterTimeout != null) {
httpClientConfigBuilder.withReadTimeout(new Duration(emitterTimeout));
}
final HttpClient httpClient = HttpClientInit.createClient(httpClientConfigBuilder.build(), lifecycle);
final ServiceEmitter emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
Injector injector = makeInjector(
DruidSecondaryModule.class,
new LifecycleModule(Key.get(MonitorScheduler.class)),
EmitterModule.class,
CuratorModule.class,
MetricsModule.class,
DiscoveryModule.class,
ServerModule.class,
MasterModule.class
);
EmittingLogger.registerEmitter(emitter);
final ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class);
final ConfigurationObjectFactory configFactory = injector.getInstance(ConfigurationObjectFactory.class);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
final ServiceEmitter emitter = injector.getInstance(ServiceEmitter.class);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
serviceDiscoveryConfig,
lifecycle
);
CuratorFramework curatorFramework = injector.getInstance(CuratorFramework.class);
final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class);
final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
ServerInventoryView serverInventoryView = new ServerInventoryView(
configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper
);
lifecycle.addManagedInstance(serverInventoryView);
ServerInventoryView serverInventoryView = injector.getInstance(ServerInventoryView.class);
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
final DatabaseRuleManagerConfig databaseRuleManagerConfig = configFactory.build(DatabaseRuleManagerConfig.class);
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
DbConnector.createSegmentTable(dbi, PropUtils.getProperty(props, "druid.database.segmentTable"));
DbConnector.createRuleTable(dbi, PropUtils.getProperty(props, "druid.database.ruleTable"));
DatabaseRuleManager.createDefaultRule(
dbi, databaseRuleManagerConfig.getRuleTable(), databaseRuleManagerConfig.getDefaultDatasource(), jsonMapper
);
final DatabaseSegmentManager databaseSegmentManager = new DatabaseSegmentManager(
jsonMapper,
scheduledExecutorFactory.create(1, "DatabaseSegmentManager-Exec--%d"),
configFactory.build(DatabaseSegmentManagerConfig.class),
dbi
);
final DatabaseRuleManager databaseRuleManager = new DatabaseRuleManager(
jsonMapper,
scheduledExecutorFactory.create(1, "DatabaseRuleManager-Exec--%d"),
databaseRuleManagerConfig,
dbi
);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler healthMonitor = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
ImmutableList.<Monitor>of(
new JvmMonitor(),
new SysMonitor()
)
);
lifecycle.addManagedInstance(healthMonitor);
final DatabaseSegmentManager databaseSegmentManager = injector.getInstance(DatabaseSegmentManager.class);
final DatabaseRuleManager databaseRuleManager = injector.getInstance(DatabaseRuleManager.class);
final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class);
final DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
serviceDiscoveryConfig,
lifecycle
);
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
serviceDiscoveryConfig, serviceDiscovery
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
final ServiceDiscovery<Void> serviceDiscovery = injector.getInstance(Key.get(new TypeLiteral<ServiceDiscovery<Void>>(){}));
final ServiceAnnouncer serviceAnnouncer = injector.getInstance(ServiceAnnouncer.class);
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
IndexingServiceClient indexingServiceClient = null;
if (druidMasterConfig.getMergerServiceName() != null) {
@ -187,9 +136,10 @@ public class MasterMain
serviceDiscovery,
lifecycle
);
indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider);
// indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); TODO
}
DBI dbi = injector.getInstance(DBI.class);
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());
JacksonConfigManager configManager = new JacksonConfigManager(
@ -237,7 +187,7 @@ public class MasterMain
)
);
final Injector injector = Guice.createInjector(
final Injector injector2 = Guice.createInjector(
new MasterServletModule(
serverInventoryView,
databaseSegmentManager,
@ -289,7 +239,7 @@ public class MasterMain
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addEventListener(new GuiceServletConfig(injector2));
root.addFilter(
new FilterHolder(
new RedirectFilter(
@ -304,4 +254,41 @@ public class MasterMain
server.start();
server.join();
}
private static Injector makeInjector(final Object... modules)
{
final Injector baseInjector = Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigFactoryModule()
);
return Guice.createInjector(
Iterables.transform(
Arrays.asList(modules),
new Function<Object, Module>()
{
@Override
@SuppressWarnings("unchecked")
public Module apply(@Nullable Object input)
{
if (input instanceof Module) {
baseInjector.injectMembers(input);
return (Module) input;
}
if (input instanceof Class) {
if (Module.class.isAssignableFrom((Class) input)) {
return baseInjector.getInstance((Class<? extends Module>) input);
}
else {
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
}
}
throw new ISE("Unknown module type[%s]", input.getClass());
}
}
)
);
}
}

View File

@ -19,10 +19,10 @@
package com.metamx.druid.http;
import com.google.inject.Inject;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.LoadPeonCallback;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;

View File

@ -19,12 +19,12 @@
package com.metamx.druid.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.Segment;
import javax.inject.Inject;
import java.util.Map;
/**

View File

@ -0,0 +1,77 @@
package com.metamx.druid.metrics;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.guice.LazySingleton;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import java.util.List;
/**
* Sets up the {@link MonitorScheduler} to monitor things on a regular schedule. {@link Monitor}s must be explicitly
* bound in order to be loaded.
*/
public class MetricsModule implements Module
{
private static final Logger log = new Logger(MetricsModule.class);
private final Class<? extends Monitor>[] monitors;
/**
* A constructor that takes a list of {@link Monitor} classes to explicitly bind so that they will be instantiated
*
* @param monitors list of {@link Monitor} classes to explicitly bind
*/
public MetricsModule(
Class<? extends Monitor>... monitors
)
{
this.monitors = monitors;
}
@Override
public void configure(Binder binder)
{
binder.bind(JvmMonitor.class).in(LazySingleton.class);
binder.bind(SysMonitor.class).in(LazySingleton.class);
for (Class<? extends Monitor> monitor : monitors) {
binder.bind(monitor).in(LazySingleton.class);
}
}
@Provides @ManageLifecycle
public MonitorScheduler getMonitorScheduler(MonitorSchedulerConfig config, ServiceEmitter emitter, Injector injector)
{
List<Monitor> monitors = Lists.newArrayList();
for (Key<?> key: injector.getBindings().keySet()) {
if (Monitor.class.isAssignableFrom(key.getClass())) {
final Monitor monitor = (Monitor) injector.getInstance(key);
log.info("Adding monitor[%s]", monitor);
monitors.add(monitor);
}
}
return new MonitorScheduler(
config,
Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
emitter,
monitors
);
}
}

View File

@ -19,7 +19,6 @@
package com.metamx.druid.metrics;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.emitter.service.ServiceEmitter;

View File

@ -19,10 +19,10 @@
package com.metamx.druid.db;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -32,7 +32,6 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@ -48,21 +47,8 @@ public class DatabaseSegmentManagerTest
dbi = EasyMock.createMock(DBI.class);
manager = new DatabaseSegmentManager(
new DefaultObjectMapper(),
EasyMock.createMock(ScheduledExecutorService.class),
new DatabaseSegmentManagerConfig()
{
@Override
public String getSegmentTable()
{
return null;
}
@Override
public Duration getPollDuration()
{
return null;
}
},
Suppliers.ofInstance(new DatabaseSegmentManagerConfig()),
Suppliers.ofInstance(DbTablesConfig.fromBase("test")),
dbi
);