mirror of https://github.com/apache/druid.git
1) Another whole bunch of changes to annotate things and create Modules and bind stuff. But OMFG, the compute node actually appears to be working!
2) The compute node works with Guice 3) The compute node fires up with Guice and appears to work 4) Did I mention that the compute node, now called historical node, fires up with Guice and appears to work?
This commit is contained in:
parent
f4fc8b2f6e
commit
019bb5d453
|
@ -327,8 +327,8 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
|
|||
final DruidServerConfig serverConfig = getConfigFactory().build(DruidServerConfig.class);
|
||||
setDruidServerMetadata(
|
||||
new DruidServerMetadata(
|
||||
serverConfig.getServerName(),
|
||||
serverConfig.getHost(),
|
||||
null, // TODO: serverConfig.getServerName(),
|
||||
null, // TODO: serverConfig.getHost(),
|
||||
serverConfig.getMaxSize(),
|
||||
nodeType,
|
||||
serverConfig.getTier()
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.coordination.DruidServerMetadata;
|
||||
import com.metamx.druid.initialization.DruidNode;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -45,13 +46,14 @@ public class DruidServer implements Comparable
|
|||
private volatile long currSize;
|
||||
|
||||
public DruidServer(
|
||||
DruidNode node,
|
||||
DruidServerConfig config,
|
||||
String type
|
||||
)
|
||||
{
|
||||
this(
|
||||
config.getServerName(),
|
||||
config.getHost(),
|
||||
node.getHost(),
|
||||
node.getHost(),
|
||||
config.getMaxSize(),
|
||||
type,
|
||||
config.getTier()
|
||||
|
|
|
@ -19,23 +19,28 @@
|
|||
|
||||
package com.metamx.druid.client;
|
||||
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class DruidServerConfig
|
||||
public class DruidServerConfig
|
||||
{
|
||||
@Config("druid.host")
|
||||
public abstract String getServerName();
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private long maxSize = -1;
|
||||
|
||||
@Config("druid.host")
|
||||
public abstract String getHost();
|
||||
@JsonProperty
|
||||
private String tier = "_default_tier";
|
||||
|
||||
@Config("druid.server.maxSize")
|
||||
public abstract long getMaxSize();
|
||||
public long getMaxSize()
|
||||
{
|
||||
return maxSize;
|
||||
}
|
||||
|
||||
@Config("druid.server.tier")
|
||||
@Default("_default_tier")
|
||||
public abstract String getTier();
|
||||
public String getTier()
|
||||
{
|
||||
return tier;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package com.metamx.druid.coordination;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -46,6 +47,7 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
|
|||
|
||||
private volatile boolean started = false;
|
||||
|
||||
@Inject
|
||||
public CuratorDataSegmentAnnouncer(
|
||||
DruidServerMetadata server,
|
||||
ZkPathsConfig config,
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
package com.metamx.druid.http;
|
||||
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ZkPathsModule implements Module
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
}
|
||||
}
|
|
@ -113,8 +113,6 @@ public class AnnouncerTest extends CuratorTestBase
|
|||
@Test
|
||||
public void testSessionKilled() throws Exception
|
||||
{
|
||||
final ExecutorService exec = Execs.singleThreaded("test-announcer-sanity-%s");
|
||||
|
||||
curator.start();
|
||||
Announcer announcer = new Announcer(curator, exec);
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
package com.fasterxml.jackson.databind.introspect;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.google.inject.BindingAnnotation;
|
||||
import com.google.inject.Key;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class GuiceAnnotationIntrospector extends NopAnnotationIntrospector
|
||||
{
|
||||
@Override
|
||||
public Object findInjectableValueId(AnnotatedMember m)
|
||||
{
|
||||
if (m.getAnnotation(JacksonInject.class) == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Annotation guiceAnnotation = null;
|
||||
for (Annotation annotation : m.getAllAnnotations()._annotations.values()) {
|
||||
if (annotation.annotationType().isAnnotationPresent(BindingAnnotation.class)) {
|
||||
guiceAnnotation = annotation;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (guiceAnnotation == null) {
|
||||
return Key.get(m.getGenericType());
|
||||
}
|
||||
return Key.get(m.getGenericType(), guiceAnnotation);
|
||||
}
|
||||
}
|
|
@ -3,12 +3,9 @@ package com.fasterxml.jackson.databind.introspect;
|
|||
import com.fasterxml.jackson.databind.BeanProperty;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.google.inject.BindingAnnotation;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class GuiceInjectableValues extends InjectableValues
|
||||
|
@ -22,22 +19,6 @@ public class GuiceInjectableValues extends InjectableValues
|
|||
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
|
||||
)
|
||||
{
|
||||
final AnnotatedMember member = forProperty.getMember();
|
||||
Annotation guiceAnnotation = null;
|
||||
for (Annotation annotation : member.getAllAnnotations()._annotations.values()) {
|
||||
if (annotation.annotationType().isAnnotationPresent(BindingAnnotation.class)) {
|
||||
guiceAnnotation = annotation;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final Key<?> key;
|
||||
if (guiceAnnotation == null) {
|
||||
key = Key.get(forProperty.getMember().getGenericType());
|
||||
}
|
||||
else {
|
||||
key = Key.get(forProperty.getMember().getGenericType(), guiceAnnotation);
|
||||
}
|
||||
return injector.getInstance(key);
|
||||
return injector.getInstance((Key) valueId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,19 +14,26 @@ public class DbTablesConfig
|
|||
return new DbTablesConfig(base, null, null, null, null, null, null);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private final String base;
|
||||
|
||||
@JsonProperty("segments")
|
||||
private final String segmentsTable;
|
||||
|
||||
@JsonProperty("rules")
|
||||
private final String rulesTable;
|
||||
|
||||
@JsonProperty("config")
|
||||
private final String configTable;
|
||||
|
||||
@JsonProperty("tasks")
|
||||
private final String tasksTable;
|
||||
|
||||
@JsonProperty("taskLog")
|
||||
private final String taskLogTable;
|
||||
|
||||
@JsonProperty("taskLock")
|
||||
private final String taskLockTable;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -61,43 +68,36 @@ public class DbTablesConfig
|
|||
return explicitTableName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getBase()
|
||||
{
|
||||
return base;
|
||||
}
|
||||
|
||||
@JsonProperty("segments")
|
||||
public String getSegmentsTable()
|
||||
{
|
||||
return segmentsTable;
|
||||
}
|
||||
|
||||
@JsonProperty("rules")
|
||||
public String getRulesTable()
|
||||
{
|
||||
return rulesTable;
|
||||
}
|
||||
|
||||
@JsonProperty("config")
|
||||
public String getConfigTable()
|
||||
{
|
||||
return configTable;
|
||||
}
|
||||
|
||||
@JsonProperty("tasks")
|
||||
public String getTasksTable()
|
||||
{
|
||||
return tasksTable;
|
||||
}
|
||||
|
||||
@JsonProperty("taskLog")
|
||||
public String getTaskLogTable()
|
||||
{
|
||||
return taskLogTable;
|
||||
}
|
||||
|
||||
@JsonProperty("taskLock")
|
||||
public String getTaskLockTable()
|
||||
{
|
||||
return taskLockTable;
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.metamx.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
|
||||
import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector;
|
||||
import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -58,14 +60,28 @@ public class DruidSecondaryModule implements Module
|
|||
@Provides @LazySingleton @Json
|
||||
public ObjectMapper getJsonMapper(final Injector injector)
|
||||
{
|
||||
jsonMapper.setInjectableValues(new GuiceInjectableValues(injector));
|
||||
setupJackson(injector, jsonMapper);
|
||||
return jsonMapper;
|
||||
}
|
||||
|
||||
@Provides @LazySingleton @Smile
|
||||
public ObjectMapper getSmileMapper(Injector injector)
|
||||
{
|
||||
smileMapper.setInjectableValues(new GuiceInjectableValues(injector));
|
||||
setupJackson(injector, smileMapper);
|
||||
return smileMapper;
|
||||
}
|
||||
|
||||
private void setupJackson(Injector injector, final ObjectMapper mapper) {
|
||||
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
|
||||
|
||||
mapper.setInjectableValues(new GuiceInjectableValues(injector));
|
||||
mapper.setAnnotationIntrospectors(
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()
|
||||
),
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.introspect.AnnotatedField;
|
|||
import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -95,7 +96,7 @@ public class JsonConfigurator
|
|||
final String fieldName = next.getName();
|
||||
final Field theField = beanClazz.getDeclaredField(fieldName);
|
||||
JsonProperty annotation = theField.getAnnotation(JsonProperty.class);
|
||||
pathParts.add(annotation == null || annotation.value() == null ? fieldName : annotation.value());
|
||||
pathParts.add(annotation == null || Strings.isNullOrEmpty(annotation.value()) ? fieldName : annotation.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
*/
|
||||
public class LifecycleModule implements Module
|
||||
{
|
||||
private final LifecycleScope scope = new LifecycleScope();
|
||||
private final LifecycleScope scope = new LifecycleScope(Lifecycle.Stage.NORMAL);
|
||||
private final LifecycleScope lastScope = new LifecycleScope(Lifecycle.Stage.LAST);
|
||||
private final List<Key<?>> eagerClasses = new CopyOnWriteArrayList<Key<?>>();
|
||||
public boolean configured = false;
|
||||
|
||||
|
@ -136,6 +137,7 @@ public class LifecycleModule implements Module
|
|||
synchronized (eagerClasses) {
|
||||
configured = true;
|
||||
binder.bindScope(ManageLifecycle.class, scope);
|
||||
binder.bindScope(ManageLifecycleLast.class, lastScope);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -153,6 +155,7 @@ public class LifecycleModule implements Module
|
|||
}
|
||||
};
|
||||
scope.setLifecycle(lifecycle);
|
||||
lastScope.setLifecycle(lifecycle);
|
||||
|
||||
return lifecycle;
|
||||
}
|
||||
|
|
|
@ -15,10 +15,16 @@ import java.util.List;
|
|||
public class LifecycleScope implements Scope
|
||||
{
|
||||
private static final Logger log = new Logger(LifecycleScope.class);
|
||||
private final Lifecycle.Stage stage;
|
||||
|
||||
private Lifecycle lifecycle;
|
||||
private List<Object> instances = Lists.newLinkedList();
|
||||
|
||||
public LifecycleScope(Lifecycle.Stage stage)
|
||||
{
|
||||
this.stage = stage;
|
||||
}
|
||||
|
||||
public void setLifecycle(Lifecycle lifecycle)
|
||||
{
|
||||
this.lifecycle = lifecycle;
|
||||
|
@ -48,7 +54,7 @@ public class LifecycleScope implements Scope
|
|||
}
|
||||
else {
|
||||
try {
|
||||
lifecycle.addMaybeStartManagedInstance(retVal);
|
||||
lifecycle.addMaybeStartManagedInstance(retVal, stage);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Caught exception when trying to create a[%s]", key);
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package com.metamx.druid.guice;
|
||||
|
||||
import com.google.inject.ScopeAnnotation;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
|
||||
/**
|
||||
* Marks the object to be managed by {@link com.metamx.common.lifecycle.Lifecycle} and set to be on Stage.LAST
|
||||
*
|
||||
* This Scope gets defined by {@link com.metamx.druid.guice.LifecycleModule}
|
||||
*/
|
||||
@Target({ ElementType.TYPE, ElementType.METHOD })
|
||||
@Retention(RUNTIME)
|
||||
@ScopeAnnotation
|
||||
public @interface ManageLifecycleLast
|
||||
{
|
||||
}
|
|
@ -24,6 +24,10 @@ import com.google.common.collect.Maps;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.ServerView;
|
||||
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
||||
import com.metamx.druid.indexing.common.actions.TaskActionClient;
|
||||
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import com.metamx.druid.indexing.common.config.TaskConfig;
|
||||
import com.metamx.druid.indexing.common.task.Task;
|
||||
import com.metamx.druid.loading.DataSegmentKiller;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
|
||||
|
@ -31,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller;
|
|||
import com.metamx.druid.loading.SegmentLoaderConfig;
|
||||
import com.metamx.druid.loading.SegmentLoadingException;
|
||||
import com.metamx.druid.loading.SingleSegmentLoader;
|
||||
import com.metamx.druid.indexing.common.actions.TaskActionClient;
|
||||
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import com.metamx.druid.indexing.common.config.TaskConfig;
|
||||
import com.metamx.druid.indexing.common.task.Task;
|
||||
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
|
@ -141,7 +141,7 @@ public class TaskToolbox
|
|||
new SegmentLoaderConfig()
|
||||
{
|
||||
@Override
|
||||
public File getCacheDirectory()
|
||||
public File getSegmentLocations()
|
||||
{
|
||||
return new File(getTaskWorkDir(), "fetched_segments");
|
||||
}
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -173,17 +173,17 @@
|
|||
<dependency>
|
||||
<groupId>com.google.inject</groupId>
|
||||
<artifactId>guice</artifactId>
|
||||
<version>3.0</version>
|
||||
<version>4.0-beta</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject.extensions</groupId>
|
||||
<artifactId>guice-servlet</artifactId>
|
||||
<version>3.0</version>
|
||||
<version>4.0-beta</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject.extensions</groupId>
|
||||
<artifactId>guice-multibindings</artifactId>
|
||||
<version>3.0</version>
|
||||
<version>4.0-beta</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ibm.icu</groupId>
|
||||
|
|
|
@ -22,10 +22,12 @@ package com.metamx.druid.coordination;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.loading.SegmentLoaderConfig;
|
||||
import com.metamx.druid.loading.SegmentLoadingException;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
@ -47,7 +49,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
private final Object lock = new Object();
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ZkCoordinatorConfig config;
|
||||
private final SegmentLoaderConfig config;
|
||||
private final DruidServerMetadata me;
|
||||
private final DataSegmentAnnouncer announcer;
|
||||
private final CuratorFramework curator;
|
||||
|
@ -59,9 +61,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
private volatile PathChildrenCache loadQueueCache;
|
||||
private volatile boolean started;
|
||||
|
||||
@Inject
|
||||
public ZkCoordinator(
|
||||
ObjectMapper jsonMapper,
|
||||
ZkCoordinatorConfig config,
|
||||
SegmentLoaderConfig config,
|
||||
ZkPathsConfig zkPaths,
|
||||
DruidServerMetadata me,
|
||||
DataSegmentAnnouncer announcer,
|
||||
|
@ -98,7 +101,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
);
|
||||
|
||||
try {
|
||||
config.getSegmentInfoCacheDirectory().mkdirs();
|
||||
config.getInfoDir().mkdirs();
|
||||
|
||||
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
|
||||
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
|
||||
|
@ -187,7 +190,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
|
||||
private void loadCache()
|
||||
{
|
||||
File baseDir = config.getSegmentInfoCacheDirectory();
|
||||
File baseDir = config.getInfoDir();
|
||||
if (!baseDir.exists()) {
|
||||
return;
|
||||
}
|
||||
|
@ -201,7 +204,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
} else {
|
||||
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
|
||||
|
||||
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
|
||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.delete()) {
|
||||
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
|
||||
}
|
||||
|
@ -221,7 +224,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
try {
|
||||
serverManager.loadSegment(segment);
|
||||
|
||||
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
|
||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
|
@ -253,7 +256,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
try {
|
||||
serverManager.dropSegment(segment);
|
||||
|
||||
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
|
||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.delete()) {
|
||||
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
|
||||
}
|
||||
|
|
|
@ -5,9 +5,6 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.druid.client.InventoryView;
|
||||
import com.metamx.druid.client.ServerInventoryView;
|
||||
import com.metamx.druid.client.ServerInventoryViewConfig;
|
||||
|
@ -27,7 +24,6 @@ import com.metamx.druid.http.MasterRedirectInfo;
|
|||
import com.metamx.druid.http.RedirectFilter;
|
||||
import com.metamx.druid.http.RedirectInfo;
|
||||
import com.metamx.druid.http.RedirectServlet;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.master.DruidMasterConfig;
|
||||
import com.metamx.druid.master.LoadQueueTaskMaster;
|
||||
|
@ -46,7 +42,6 @@ public class CoordinatorModule implements Module
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
ConfigProvider.bind(binder, DruidMasterConfig.class);
|
||||
ConfigProvider.bind(binder, ZkPathsConfig.class);
|
||||
ConfigProvider.bind(binder, ServerInventoryViewConfig.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class);
|
||||
|
@ -110,10 +105,4 @@ public class CoordinatorModule implements Module
|
|||
{
|
||||
return new LoadQueueTaskMaster(curator, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d"));
|
||||
}
|
||||
|
||||
@Provides @LazySingleton
|
||||
public ScheduledExecutorFactory getScheduledExecutorFactory(Lifecycle lifecycle)
|
||||
{
|
||||
return ScheduledExecutors.createFactory(lifecycle);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,9 +15,16 @@ import com.metamx.druid.Query;
|
|||
import com.metamx.druid.client.DruidServerConfig;
|
||||
import com.metamx.druid.collect.StupidPool;
|
||||
import com.metamx.druid.concurrent.Execs;
|
||||
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.DruidServerMetadata;
|
||||
import com.metamx.druid.coordination.ServerManager;
|
||||
import com.metamx.druid.coordination.ZkCoordinator;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.guice.annotations.Global;
|
||||
import com.metamx.druid.guice.annotations.Processing;
|
||||
import com.metamx.druid.guice.annotations.Self;
|
||||
import com.metamx.druid.initialization.DruidNode;
|
||||
import com.metamx.druid.loading.BaseSegmentLoader;
|
||||
import com.metamx.druid.loading.DataSegmentPuller;
|
||||
import com.metamx.druid.loading.HdfsDataSegmentPuller;
|
||||
|
@ -48,6 +55,7 @@ import com.metamx.druid.query.timeseries.TimeseriesQuery;
|
|||
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
|
@ -68,10 +76,10 @@ public class HistoricalModule implements Module
|
|||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
ConfigProvider.bind(binder, DruidServerConfig.class);
|
||||
ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
|
||||
binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
|
||||
|
||||
binder.bind(ServerManager.class).in(LazySingleton.class);
|
||||
|
@ -79,7 +87,6 @@ public class HistoricalModule implements Module
|
|||
binder.bind(SegmentLoader.class).to(BaseSegmentLoader.class).in(LazySingleton.class);
|
||||
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
|
||||
|
||||
|
||||
final MapBinder<String, DataSegmentPuller> segmentPullerBinder = MapBinder.newMapBinder(
|
||||
binder, String.class, DataSegmentPuller.class
|
||||
);
|
||||
|
@ -96,14 +103,24 @@ public class HistoricalModule implements Module
|
|||
|
||||
queryFactoryBinder.addBinding(TimeseriesQuery.class).to(TimeseriesQueryRunnerFactory.class).in(LazySingleton.class);
|
||||
queryFactoryBinder.addBinding(SearchQuery.class).to(SearchQueryRunnerFactory.class).in(LazySingleton.class);
|
||||
queryFactoryBinder.addBinding(TimeBoundaryQuery.class).to(TimeBoundaryQueryRunnerFactory.class).in(LazySingleton.class);
|
||||
queryFactoryBinder.addBinding(SegmentMetadataQuery.class).to(SegmentMetadataQueryRunnerFactory.class).in(LazySingleton.class);
|
||||
queryFactoryBinder.addBinding(TimeBoundaryQuery.class)
|
||||
.to(TimeBoundaryQueryRunnerFactory.class)
|
||||
.in(LazySingleton.class);
|
||||
queryFactoryBinder.addBinding(SegmentMetadataQuery.class)
|
||||
.to(SegmentMetadataQueryRunnerFactory.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
queryFactoryBinder.addBinding(GroupByQuery.class).to(GroupByQueryRunnerFactory.class).in(LazySingleton.class);
|
||||
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
|
||||
binder.bind(GroupByQueryEngine.class).in(LazySingleton.class);
|
||||
|
||||
binder.bind(QueryRunnerFactoryConglomerate.class).to(DefaultQueryRunnerFactoryConglomerate.class).in(LazySingleton.class);
|
||||
binder.bind(QueryRunnerFactoryConglomerate.class)
|
||||
.to(DefaultQueryRunnerFactoryConglomerate.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
|
||||
|
||||
binder.bind(DataSegmentAnnouncer.class).to(CuratorDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
|
||||
}
|
||||
|
||||
private void bindDeepStorageS3(Binder binder)
|
||||
|
@ -133,6 +150,18 @@ public class HistoricalModule implements Module
|
|||
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
|
||||
}
|
||||
|
||||
@Provides @LazySingleton
|
||||
public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config)
|
||||
{
|
||||
return new DruidServerMetadata(node.getHost(), node.getHost(), config.getMaxSize(), "historical", config.getTier());
|
||||
}
|
||||
|
||||
@Provides @ManageLifecycle
|
||||
public Announcer getAnnouncer(CuratorFramework curator)
|
||||
{
|
||||
return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));
|
||||
}
|
||||
|
||||
@Provides @Processing @ManageLifecycle
|
||||
public ExecutorService getProcessingExecutorService(ExecutorServiceConfig config, ServiceEmitter emitter)
|
||||
{
|
||||
|
@ -144,10 +173,10 @@ public class HistoricalModule implements Module
|
|||
}
|
||||
|
||||
@Provides @LazySingleton
|
||||
public RestS3Service getRestS3Service(Supplier<S3CredentialsConfig> config)
|
||||
public RestS3Service getRestS3Service(S3CredentialsConfig config)
|
||||
{
|
||||
try {
|
||||
return new RestS3Service(new AWSCredentials(config.get().getAccessKey(), config.get().getSecretKey()));
|
||||
return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey()));
|
||||
}
|
||||
catch (S3ServiceException e) {
|
||||
throw new ProvisionException("Unable to create a RestS3Service", e);
|
||||
|
|
|
@ -8,6 +8,7 @@ import com.metamx.common.concurrent.ScheduledExecutors;
|
|||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.druid.guice.annotations.Self;
|
||||
import com.metamx.druid.initialization.DruidNode;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -16,6 +17,8 @@ public class ServerModule implements Module
|
|||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
ConfigProvider.bind(binder, ZkPathsConfig.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid", DruidNode.class, Self.class);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,6 @@ import com.metamx.druid.BaseServerNode;
|
|||
import com.metamx.druid.client.DruidServerConfig;
|
||||
import com.metamx.druid.coordination.ServerManager;
|
||||
import com.metamx.druid.coordination.ZkCoordinator;
|
||||
import com.metamx.druid.coordination.ZkCoordinatorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerInit;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -113,7 +112,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
|
|||
|
||||
final ZkCoordinator coordinator = new ZkCoordinator(
|
||||
getJsonMapper(),
|
||||
getConfigFactory().build(ZkCoordinatorConfig.class),
|
||||
null, // TODO: getConfigFactory().build(ZkCoordinatorConfig.class),
|
||||
getZkPaths(),
|
||||
getDruidServerMetadata(),
|
||||
getAnnouncer(),
|
||||
|
|
|
@ -28,25 +28,29 @@ import java.io.File;
|
|||
*/
|
||||
public class SegmentLoaderConfig
|
||||
{
|
||||
@JsonProperty("path")
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private File cacheDirectory;
|
||||
private String locations;
|
||||
|
||||
@JsonProperty("deleteOnRemove")
|
||||
private boolean deleteOnRemove = true;
|
||||
|
||||
public File getCacheDirectory()
|
||||
{
|
||||
return cacheDirectory;
|
||||
}
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private File infoDir = null;
|
||||
|
||||
public void setCacheDirectory(File cacheDirectory)
|
||||
public String getLocations()
|
||||
{
|
||||
this.cacheDirectory = cacheDirectory;
|
||||
return locations;
|
||||
}
|
||||
|
||||
public boolean isDeleteOnRemove()
|
||||
{
|
||||
return deleteOnRemove;
|
||||
}
|
||||
|
||||
public File getInfoDir()
|
||||
{
|
||||
return infoDir;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,9 @@ import com.metamx.druid.index.QueryableIndexSegment;
|
|||
import com.metamx.druid.index.Segment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -57,13 +59,13 @@ public class SingleSegmentLoader implements SegmentLoader
|
|||
@Override
|
||||
public boolean isSegmentLoaded(final DataSegment segment)
|
||||
{
|
||||
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
File localStorageDir = new File(config.getLocations(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
if (localStorageDir.exists()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
final File legacyStorageDir = new File(
|
||||
config.getCacheDirectory(),
|
||||
config.getLocations(),
|
||||
DataSegmentPusherUtil.getLegacyStorageDir(segment)
|
||||
);
|
||||
return legacyStorageDir.exists();
|
||||
|
@ -80,11 +82,11 @@ public class SingleSegmentLoader implements SegmentLoader
|
|||
|
||||
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
File localStorageDir = new File(config.getLocations(), DataSegmentPusherUtil.getStorageDir(segment));
|
||||
|
||||
final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment);
|
||||
if (legacyDir != null) {
|
||||
File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir);
|
||||
File legacyStorageDir = new File(config.getLocations(), legacyDir);
|
||||
|
||||
if (legacyStorageDir.exists()) {
|
||||
log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir);
|
||||
|
@ -148,7 +150,7 @@ public class SingleSegmentLoader implements SegmentLoader
|
|||
segment.getShardSpec().getPartitionNum()
|
||||
);
|
||||
|
||||
return new File(config.getCacheDirectory(), outputKey);
|
||||
return new File(config.getLocations(), outputKey);
|
||||
}
|
||||
|
||||
private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
|
||||
|
|
|
@ -32,6 +32,7 @@ import com.metamx.druid.index.v1.IndexIO;
|
|||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.CacheTestSegmentLoader;
|
||||
import com.metamx.druid.loading.SegmentLoaderConfig;
|
||||
import com.metamx.druid.metrics.NoopServiceEmitter;
|
||||
import com.metamx.druid.query.NoopQueryRunnerFactoryConglomerate;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
|
@ -54,7 +55,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
private ZkCoordinator zkCoordinator;
|
||||
private ServerManager serverManager;
|
||||
private DataSegmentAnnouncer announcer;
|
||||
private File cacheDir;
|
||||
private File infoDir;
|
||||
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||
|
||||
|
@ -64,12 +65,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
setupServerAndCurator();
|
||||
curator.start();
|
||||
try {
|
||||
cacheDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
|
||||
cacheDir.mkdirs();
|
||||
for (File file : cacheDir.listFiles()) {
|
||||
infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
|
||||
infoDir.mkdirs();
|
||||
for (File file : infoDir.listFiles()) {
|
||||
file.delete();
|
||||
}
|
||||
log.info("Creating tmp test files in [%s]", cacheDir);
|
||||
log.info("Creating tmp test files in [%s]", infoDir);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -99,12 +100,11 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
|
||||
zkCoordinator = new ZkCoordinator(
|
||||
jsonMapper,
|
||||
new ZkCoordinatorConfig()
|
||||
{
|
||||
new SegmentLoaderConfig(){
|
||||
@Override
|
||||
public File getSegmentInfoCacheDirectory()
|
||||
public File getInfoDir()
|
||||
{
|
||||
return cacheDir;
|
||||
return infoDir;
|
||||
}
|
||||
},
|
||||
zkPaths,
|
||||
|
@ -155,8 +155,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
deleteSegmentFromCache(segment);
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, cacheDir.listFiles().length);
|
||||
Assert.assertTrue(cacheDir.delete());
|
||||
Assert.assertEquals(0, infoDir.listFiles().length);
|
||||
Assert.assertTrue(infoDir.delete());
|
||||
}
|
||||
|
||||
private DataSegment makeSegment(String dataSource, String version, Interval interval)
|
||||
|
@ -165,7 +165,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
dataSource,
|
||||
interval,
|
||||
version,
|
||||
ImmutableMap.<String, Object>of("version", version, "interval", interval, "cacheDir", cacheDir),
|
||||
ImmutableMap.<String, Object>of("version", version, "interval", interval, "cacheDir", infoDir),
|
||||
Arrays.asList("dim1", "dim2", "dim3"),
|
||||
Arrays.asList("metric1", "metric2"),
|
||||
new NoneShardSpec(),
|
||||
|
@ -176,12 +176,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
|
||||
private void writeSegmentToCache(final DataSegment segment) throws IOException
|
||||
{
|
||||
if (!cacheDir.exists()) {
|
||||
cacheDir.mkdir();
|
||||
if (!infoDir.exists()) {
|
||||
infoDir.mkdir();
|
||||
}
|
||||
|
||||
File segmentInfoCacheFile = new File(
|
||||
cacheDir,
|
||||
infoDir,
|
||||
segment.getIdentifier()
|
||||
);
|
||||
try {
|
||||
|
@ -197,7 +197,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
private void deleteSegmentFromCache(final DataSegment segment) throws IOException
|
||||
{
|
||||
File segmentInfoCacheFile = new File(
|
||||
cacheDir,
|
||||
infoDir,
|
||||
segment.getIdentifier()
|
||||
);
|
||||
if (segmentInfoCacheFile.exists()) {
|
||||
|
@ -209,8 +209,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
|
||||
private void checkCache(List<DataSegment> segments) throws IOException
|
||||
{
|
||||
Assert.assertTrue(cacheDir.exists());
|
||||
File[] files = cacheDir.listFiles();
|
||||
Assert.assertTrue(infoDir.exists());
|
||||
File[] files = infoDir.listFiles();
|
||||
|
||||
List<File> sortedFiles = Lists.newArrayList(files);
|
||||
Collections.sort(sortedFiles);
|
||||
|
|
|
@ -20,7 +20,10 @@ import com.metamx.druid.initialization.JettyServerModule;
|
|||
import com.metamx.druid.metrics.MetricsModule;
|
||||
import com.metamx.druid.metrics.ServerMonitor;
|
||||
import io.airlift.command.Command;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
|
||||
|
@ -66,6 +69,10 @@ public class CliHistorical extends ServerRunnable
|
|||
|
||||
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
|
||||
root.addServlet(new ServletHolder(injector.getInstance(QueryServlet.class)), "/druid/v2/*");
|
||||
|
||||
final HandlerList handlerList = new HandlerList();
|
||||
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
|
||||
server.setHandler(handlerList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue