Fix issues with bindings and handling extensions

The way the Guice bindings were setup previously, each process only had bindings
for the things it cared about.  This became problematic when adding extension modules
that bound everything that they could possibly need expecting that the processes would
only instantiate what they actually do need.  Guice tries to fail-fast and verifies that all
 bindings exist before it does anything, which is a problem because the extension bind
 some objects that don't necessarily have all of their dependencies bound in all processes.

The fix for this is to build a single Injector with all bindings in it and let each of the
 processes only load the things that they care about.  This also requires the use of
 Module overrides and other such interesting things, which are node done.

 In doing the fix, I also swapped out the way that the DataSegmentPusher/Puller stuff is bound, as well as made the Cassandra stuff fail if its settings are not provided.  This all of a sudden made all of the things require Cassandra's settings, so I migrated the Cassandra deep storage stuff into its own module.

 In doing these changes, I also discovered that some properties weren't properly converting for the ConvertProperties command (specifically, the properties related to data segment loading and pushing), so I fixed that.
This commit is contained in:
cheddar 2013-09-20 17:45:01 -05:00
parent 46631bf409
commit 5712b29c8c
53 changed files with 1276 additions and 1373 deletions

53
cassandra-storage/pom.xml Normal file
View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-cassandra-storage</artifactId>
<name>druid-cassandra-storage</name>
<description>druid-cassandra-storage</description>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -21,18 +21,22 @@ package io.druid.segment.loading.cassandra;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
/**
* Cassandra Config
*
* @author boneill42
*/
public abstract class CassandraDataSegmentConfig
public class CassandraDataSegmentConfig
{
@JsonProperty
public String host = "";
@NotNull
public String host = null;
@JsonProperty
public String keyspace = "";
@NotNull
public String keyspace = null;
public String getKeyspace()
{

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading.cassandra;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentPusher;
import java.util.List;
/**
*/
public class CassandraDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("c*")
.to(CassandraDataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("c*")
.to(CassandraDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", CassandraDataSegmentConfig.class);
}
}

View File

@ -0,0 +1 @@
io.druid.segment.loading.cassandra.CassandraDruidModule

View File

@ -68,7 +68,6 @@ public class DruidSecondaryModule implements Module
@Override
public void configure(Binder binder)
{
binder.requireExplicitBindings();
binder.install(new DruidGuiceExtensions());
binder.bind(Properties.class).toInstance(properties);
binder.bind(ConfigurationObjectFactory.class).toInstance(factory);

View File

@ -1,163 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package druid.examples.guice;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.FireDepartmentsProvider;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.RealtimeManagerConfig;
import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
/**
*/
public class RealtimeExampleModule implements DruidModule
{
private static final Logger log = new Logger(RealtimeExampleModule.class);
@Override
public void configure(Binder binder)
{
binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class);
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class);
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class);
binder.bind(InventoryView.class).to(NoopInventoryView.class);
binder.bind(ServerView.class).to(NoopServerView.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
}
@Override
public List<com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeExampleModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
)
);
}
private static class NoopServerView implements ServerView
{
@Override
public void registerServerCallback(
Executor exec, ServerCallback callback
)
{
// do nothing
}
@Override
public void registerSegmentCallback(
Executor exec, SegmentCallback callback
)
{
// do nothing
}
}
private static class NoopInventoryView implements InventoryView
{
@Override
public DruidServer getInventoryValue(String string)
{
return null;
}
@Override
public Iterable<DruidServer> getInventory()
{
return ImmutableList.of();
}
}
private static class NoopDataSegmentPusher implements DataSegmentPusher
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
return segment;
}
}
private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@Override
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
}
}

View File

@ -1,62 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import io.druid.guice.annotations.Self;
import io.druid.indexing.coordinator.ForkingTaskRunner;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.indexing.worker.WorkerTaskMonitor;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.server.DruidNode;
/**
*/
public class MiddleManagerModule implements Module
{
@Override
public void configure(Binder binder)
{
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
}
@Provides @LazySingleton
public Worker getWorker(@Self DruidNode node, WorkerConfig config)
{
return new Worker(
node.getHost(),
config.getIp(),
config.getCapacity(),
config.getVersion()
);
}
}

View File

@ -1,151 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogs;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.coordinator.DbTaskStorage;
import io.druid.indexing.coordinator.ForkingTaskRunnerFactory;
import io.druid.indexing.coordinator.HeapMemoryTaskStorage;
import io.druid.indexing.coordinator.IndexerDBCoordinator;
import io.druid.indexing.coordinator.RemoteTaskRunnerFactory;
import io.druid.indexing.coordinator.TaskLockbox;
import io.druid.indexing.coordinator.TaskMaster;
import io.druid.indexing.coordinator.TaskQueue;
import io.druid.indexing.coordinator.TaskRunnerFactory;
import io.druid.indexing.coordinator.TaskStorage;
import io.druid.indexing.coordinator.TaskStorageQueryAdapter;
import io.druid.indexing.coordinator.http.OverlordRedirectInfo;
import io.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import java.util.List;
/**
*/
public class OverlordModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TaskLogStreamer>>(){})
.toProvider(
new ListProvider<TaskLogStreamer>()
.add(TaskRunnerTaskLogStreamer.class)
.add(TaskLogs.class)
)
.in(LazySingleton.class);
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskQueue.class).in(LazySingleton.class); // Lifecycle managed by TaskMaster instead
binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)
.to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class);
configureTaskStorage(binder);
configureRunners(binder);
configureAutoscale(binder);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
}
private void configureTaskStorage(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class)
);
final MapBinder<String, TaskStorage> storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class));
storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class);
binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class);
storageBinder.addBinding("db").to(DbTaskStorage.class);
binder.bind(DbTaskStorage.class).in(LazySingleton.class);
}
private void configureRunners(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class)
);
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class));
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
}
private void configureAutoscale(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy",
Key.get(AutoScalingStrategy.class),
Key.get(NoopAutoScalingStrategy.class)
);
final MapBinder<String, AutoScalingStrategy> autoScalingBinder = PolyBind.optionBinder(
binder, Key.get(AutoScalingStrategy.class)
);
autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class);
binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class);
autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class);
binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
}

View File

@ -1,86 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.S3DataSegmentKiller;
/**
*/
public class PeonModule implements Module
{
private final ExecutorLifecycleConfig config;
public PeonModule(
ExecutorLifecycleConfig config
)
{
this.config = config;
}
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class);
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class);
binder.bind(TaskActionClientFactory.class).to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(config);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class).in(LazySingleton.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
}
}

View File

@ -17,27 +17,35 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import com.google.inject.Inject;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.util.Arrays;
/**
*/
public class LocalDataSegmentPusherProvider extends LocalDataSegmentPusherConfig implements DataSegmentPusherProvider
public class SegmentLoaderFactory
{
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
private final OmniSegmentLoader loader;
@Override
public DataSegmentPusher get()
@Inject
public SegmentLoaderFactory(
OmniSegmentLoader loader
)
{
return new LocalDataSegmentPusher(this, jsonMapper);
this.loader = loader;
}
public SegmentLoader manufacturate(File storageDir)
{
return loader.withConfig(
new SegmentLoaderConfig().withLocations(Arrays.asList(new StorageLocationConfig().setPath(storageDir)))
);
}
}

View File

@ -59,6 +59,7 @@ public class TaskToolbox
private final ExecutorService queryExecutorService;
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
private final File taskWorkDir;
public TaskToolbox(
TaskConfig config,
@ -73,7 +74,8 @@ public class TaskToolbox
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper
ObjectMapper objectMapper,
final File taskWorkDir
)
{
this.config = config;
@ -89,6 +91,7 @@ public class TaskToolbox
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
this.taskWorkDir = taskWorkDir;
}
public TaskConfig getConfig()
@ -159,6 +162,6 @@ public class TaskToolbox
public File getTaskWorkDir()
{
return new File(new File(config.getBaseTaskDir(), task.getId()), "work");
return taskWorkDir;
}
}

View File

@ -31,9 +31,9 @@ import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.server.coordination.DataSegmentAnnouncer;
import java.io.File;
import java.util.concurrent.ExecutorService;
/**
@ -51,7 +51,7 @@ public class TaskToolboxFactory
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler;
private final SegmentLoader segmentLoader;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper objectMapper;
@Inject
@ -66,7 +66,7 @@ public class TaskToolboxFactory
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper
)
{
@ -80,12 +80,14 @@ public class TaskToolboxFactory
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.segmentLoaderFactory = segmentLoaderFactory;
this.objectMapper = objectMapper;
}
public TaskToolbox build(Task task)
{
final File taskWorkDir = new File(new File(config.getBaseTaskDir(), task.getId()), "work");
return new TaskToolbox(
config,
task,
@ -98,8 +100,9 @@ public class TaskToolboxFactory
queryRunnerFactoryConglomerate,
queryExecutorService,
monitorScheduler,
segmentLoader,
objectMapper
segmentLoaderFactory.manufacturate(taskWorkDir),
objectMapper,
taskWorkDir
);
}
}

View File

@ -56,8 +56,12 @@ public class NoopTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final int sleepTime = 2500;
log.info("Running noop task[%s]", getId());
Thread.sleep(2500);
log.info("Sleeping for %,d millis.", sleepTime);
Thread.sleep(sleepTime);
log.info("Woke up!");
return TaskStatus.success(getId());
}
}

View File

@ -51,6 +51,7 @@
<module>server</module>
<module>services</module>
<module>processing</module>
<module>cassandra-storage</module>
</modules>
<dependencyManagement>

View File

@ -199,11 +199,6 @@
<artifactId>java-xmlbuilder</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>

View File

@ -19,10 +19,8 @@
package io.druid.curator.discovery;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
@ -33,18 +31,30 @@ import com.google.inject.TypeLiteral;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.KeyHolder;
import io.druid.guice.LazySingleton;
import io.druid.server.DruidNode;
import io.druid.server.initialization.CuratorDiscoveryConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ProviderStrategy;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceCacheBuilder;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.ServiceProviderBuilder;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be
@ -57,21 +67,16 @@ public class DiscoveryModule implements Module
{
private static final String NAME = "DiscoveryModule:internal";
public final List<Key<Supplier<DruidNode>>> nodesToAnnounce = new CopyOnWriteArrayList<Key<Supplier<DruidNode>>>();
public boolean configured = false;
/**
* Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle.
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
*
* @return this, for chaining.
*/
public DiscoveryModule registerDefault()
public static void registerDefault(Binder binder)
{
return registerKey(Key.get(new TypeLiteral<Supplier<DruidNode>>(){}));
registerKey(binder, Key.get(new TypeLiteral<DruidNode>(){}));
}
/**
@ -82,11 +87,10 @@ public class DiscoveryModule implements Module
* Announcement will happen in the LAST stage of the Lifecycle
*
* @param annotation The annotation instance to use in finding the DruidNode instance, usually a Named annotation
* @return this, for chaining.
*/
public DiscoveryModule register(Annotation annotation)
public static void register(Binder binder, Annotation annotation)
{
return registerKey(Key.get(new TypeLiteral<Supplier<DruidNode>>(){}, annotation));
registerKey(binder, Key.get(new TypeLiteral<DruidNode>(){}, annotation));
}
/**
@ -97,11 +101,10 @@ public class DiscoveryModule implements Module
* Announcement will happen in the LAST stage of the Lifecycle
*
* @param annotation The annotation class to use in finding the DruidNode instance
* @return this, for chaining
*/
public DiscoveryModule register(Class<? extends Annotation> annotation)
public static void register(Binder binder, Class<? extends Annotation> annotation)
{
return registerKey(Key.get(new TypeLiteral<Supplier<DruidNode>>(){}, annotation));
registerKey(binder, Key.get(new TypeLiteral<DruidNode>(){}, annotation));
}
/**
@ -112,67 +115,53 @@ public class DiscoveryModule implements Module
* Announcement will happen in the LAST stage of the Lifecycle
*
* @param key The key to use in finding the DruidNode instance
* @return this, for chaining
*/
public DiscoveryModule registerKey(Key<Supplier<DruidNode>> key)
public static void registerKey(Binder binder, Key<DruidNode> key)
{
synchronized (nodesToAnnounce) {
Preconditions.checkState(!configured, "Cannot register key[%s] after configuration.", key);
}
nodesToAnnounce.add(key);
return this;
DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key));
}
@Override
public void configure(Binder binder)
{
synchronized (nodesToAnnounce) {
configured = true;
JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class);
binder.bind(CuratorServiceAnnouncer.class).in(LazySingleton.class);
// Build the binder so that it will at a minimum inject an empty set.
DruidBinders.discoveryAnnouncementBinder(binder);
// We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect
binder.bind(ServiceAnnouncer.class)
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
.asEagerSingleton();
}
}
@Provides @LazySingleton @Named(NAME)
public CuratorServiceAnnouncer getServiceAnnouncer(
final CuratorServiceAnnouncer announcer,
final Injector injector,
final Set<KeyHolder<DruidNode>> nodesToAnnounce,
final Lifecycle lifecycle
)
{
lifecycle.addHandler(
new Lifecycle.Handler()
{
private volatile List<Supplier<DruidNode>> nodes = null;
private volatile List<DruidNode> nodes = null;
@Override
public void start() throws Exception
{
if (nodes == null) {
nodes = Lists.transform(
nodesToAnnounce,
new Function<Key<Supplier<DruidNode>>, Supplier<DruidNode>>()
{
@Nullable
@Override
public Supplier<DruidNode> apply(
@Nullable Key<Supplier<DruidNode>> input
)
{
return injector.getInstance(input);
nodes = Lists.newArrayList();
for (KeyHolder<DruidNode> holder : nodesToAnnounce) {
nodes.add(injector.getInstance(holder.getKey()));
}
}
);
}
for (Supplier<DruidNode> node : nodes) {
announcer.announce(node.get());
for (DruidNode node : nodes) {
announcer.announce(node);
}
}
@ -180,8 +169,8 @@ public class DiscoveryModule implements Module
public void stop()
{
if (nodes != null) {
for (Supplier<DruidNode> node : nodes) {
announcer.unannounce(node.get());
for (DruidNode node : nodes) {
announcer.unannounce(node);
}
}
}
@ -195,13 +184,17 @@ public class DiscoveryModule implements Module
@Provides @LazySingleton
public ServiceDiscovery<Void> getServiceDiscovery(
CuratorFramework curator,
Supplier<CuratorDiscoveryConfig> config,
CuratorDiscoveryConfig config,
Lifecycle lifecycle
) throws Exception
{
if (!config.useDiscovery()) {
return new NoopServiceDiscovery<>();
}
final ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath(config.get().getPath())
.basePath(config.getPath())
.client(curator)
.build();
@ -230,4 +223,183 @@ public class DiscoveryModule implements Module
return serviceDiscovery;
}
private static class NoopServiceDiscovery<T> implements ServiceDiscovery<T>
{
@Override
public void start() throws Exception
{
}
@Override
public void registerService(ServiceInstance<T> service) throws Exception
{
}
@Override
public void updateService(ServiceInstance<T> service) throws Exception
{
}
@Override
public void unregisterService(ServiceInstance<T> service) throws Exception
{
}
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
return new NoopServiceCacheBuilder<>();
}
@Override
public Collection<String> queryForNames() throws Exception
{
return ImmutableList.of();
}
@Override
public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception
{
return ImmutableList.of();
}
@Override
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
{
return null;
}
@Override
public ServiceProviderBuilder<T> serviceProviderBuilder()
{
return new NoopServiceProviderBuilder<>();
}
@Override
public void close() throws IOException
{
}
}
private static class NoopServiceCacheBuilder<T> implements ServiceCacheBuilder<T>
{
@Override
public ServiceCache<T> build()
{
return new NoopServiceCache<>();
}
@Override
public ServiceCacheBuilder<T> name(String name)
{
return this;
}
@Override
public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory)
{
return this;
}
private static class NoopServiceCache<T> implements ServiceCache<T>
{
@Override
public List<ServiceInstance<T>> getInstances()
{
return ImmutableList.of();
}
@Override
public void start() throws Exception
{
}
@Override
public void close() throws IOException
{
}
@Override
public void addListener(ServiceCacheListener listener)
{
}
@Override
public void addListener(
ServiceCacheListener listener, Executor executor
)
{
}
@Override
public void removeListener(ServiceCacheListener listener)
{
}
}
}
private static class NoopServiceProviderBuilder<T> implements ServiceProviderBuilder<T>
{
@Override
public ServiceProvider<T> build()
{
return new NoopServiceProvider<>();
}
@Override
public ServiceProviderBuilder<T> serviceName(String serviceName)
{
return this;
}
@Override
public ServiceProviderBuilder<T> providerStrategy(ProviderStrategy<T> providerStrategy)
{
return this;
}
@Override
public ServiceProviderBuilder<T> threadFactory(ThreadFactory threadFactory)
{
return this;
}
@Override
public ServiceProviderBuilder<T> refreshPaddingMs(int refreshPaddingMs)
{
return this;
}
}
private static class NoopServiceProvider<T> implements ServiceProvider<T>
{
@Override
public void start() throws Exception
{
}
@Override
public ServiceInstance<T> getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
}
}
}

View File

@ -1,75 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import io.druid.client.BrokerServerView;
import io.druid.client.CachingClusteredClient;
import io.druid.client.TimelineServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheProvider;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.QueryToolChestWarehouse;
import java.nio.ByteBuffer;
/**
*/
public class BrokerModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
// This is a workaround and needs to be made better in the near future.
binder.bind(
new TypeLiteral<StupidPool<ByteBuffer>>()
{
}
).annotatedWith(Global.class).toInstance(new NoopStupidPool(null));
}
private static class NoopStupidPool extends StupidPool<ByteBuffer>
{
public NoopStupidPool(Supplier<ByteBuffer> generator)
{
super(generator);
}
@Override
public ResourceHolder<ByteBuffer> take()
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -1,47 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.cassandra.CassandraDataSegmentConfig;
import io.druid.segment.loading.cassandra.CassandraDataSegmentPusher;
import javax.validation.constraints.NotNull;
/**
*/
public class CassandraDataSegmentPusherProvider implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private CassandraDataSegmentConfig config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new CassandraDataSegmentPusher(config, jsonMapper);
}
}

View File

@ -1,80 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseRuleManagerConfig;
import io.druid.db.DatabaseRuleManagerProvider;
import io.druid.db.DatabaseSegmentManager;
import io.druid.db.DatabaseSegmentManagerConfig;
import io.druid.db.DatabaseSegmentManagerProvider;
import io.druid.server.http.MasterRedirectInfo;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.http.RedirectServlet;
import io.druid.server.master.DruidMaster;
import io.druid.server.master.DruidMasterConfig;
import io.druid.server.master.LoadQueueTaskMaster;
import org.apache.curator.framework.CuratorFramework;
/**
*/
public class CoordinatorModule implements Module
{
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, DruidMasterConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
binder.bind(RedirectServlet.class).in(LazySingleton.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(DatabaseRuleManager.class)
.toProvider(DatabaseRuleManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class);
binder.bind(DruidMaster.class);
}
@Provides @LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config
)
{
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
}
}

View File

@ -1,48 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPusherConfig;
import io.druid.segment.loading.S3DataSegmentPusherConfig;
import io.druid.segment.loading.cassandra.CassandraDataSegmentConfig;
import org.apache.hadoop.conf.Configuration;
/**
*/
public class DataSegmentPusherModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.pusher", DataSegmentPusherProvider.class);
JsonConfigProvider.bind(binder, "druid.pusher.s3", S3DataSegmentPusherConfig.class);
binder.bind(Configuration.class).toInstance(new Configuration());
JsonConfigProvider.bind(binder, "druid.pusher.hdfs", HdfsDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.pusher.cassandra", CassandraDataSegmentConfig.class);
binder.bind(DataSegmentPusher.class).toProvider(DataSegmentPusherProvider.class);
}
}

View File

@ -1,37 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
import io.druid.segment.loading.DataSegmentPusher;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalDataSegmentPusherProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "s3_zip", value = S3DataSegmentPusherProvider.class),
@JsonSubTypes.Type(name = "hdfs", value = HdfsDataSegmentPusherProvider.class),
@JsonSubTypes.Type(name = "c*", value = CassandraDataSegmentPusherProvider.class)
})
public interface DataSegmentPusherProvider extends Provider<DataSegmentPusher>
{
}

View File

@ -20,19 +20,25 @@
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPuller;
import io.druid.segment.loading.HdfsDataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPusherConfig;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.S3DataSegmentPuller;
import io.druid.segment.loading.S3DataSegmentPusher;
import io.druid.segment.loading.S3DataSegmentPusherConfig;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.cassandra.CassandraDataSegmentConfig;
import io.druid.segment.loading.cassandra.CassandraDataSegmentPuller;
import org.apache.hadoop.conf.Configuration;
/**
*/
public class DataSegmentPullerModule implements Module
public class DataSegmentPusherPullerModule implements Module
{
@Override
public void configure(Binder binder)
@ -42,7 +48,10 @@ public class DataSegmentPullerModule implements Module
bindDeepStorageLocal(binder);
bindDeepStorageS3(binder);
bindDeepStorageHdfs(binder);
bindDeepStorageCassandra(binder);
PolyBind.createChoice(
binder, "druid.pusher.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
);
}
private static void bindDeepStorageLocal(Binder binder)
@ -51,6 +60,12 @@ public class DataSegmentPullerModule implements Module
.addBinding("local")
.to(LocalDataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("local")
.to(LocalDataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", LocalDataSegmentPusherConfig.class);
}
private static void bindDeepStorageS3(Binder binder)
@ -59,6 +74,12 @@ public class DataSegmentPullerModule implements Module
.addBinding("s3_zip")
.to(S3DataSegmentPuller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("s3")
.to(S3DataSegmentPusher.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.pusher", S3DataSegmentPusherConfig.class);
}
private static void bindDeepStorageHdfs(Binder binder)
@ -67,15 +88,13 @@ public class DataSegmentPullerModule implements Module
.addBinding("hdfs")
.to(HdfsDataSegmentPuller.class)
.in(LazySingleton.class);
binder.bind(Configuration.class).toInstance(new Configuration());
}
private static void bindDeepStorageCassandra(Binder binder)
{
DruidBinders.dataSegmentPullerBinder(binder)
.addBinding("c*")
.to(CassandraDataSegmentPuller.class)
binder.bind(Configuration.class).toInstance(new Configuration());
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("hdfs")
.to(HdfsDataSegmentPusher.class)
.in(LazySingleton.class);
ConfigProvider.bind(binder, CassandraDataSegmentConfig.class);
JsonConfigProvider.bind(binder, "druid.pusher", HdfsDataSegmentPusherConfig.class);
}
}

View File

@ -22,10 +22,13 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import com.metamx.metrics.Monitor;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.server.DruidNode;
/**
*/
@ -34,20 +37,14 @@ public class DruidBinders
public static MapBinder<Class<? extends Query>, QueryRunnerFactory> queryRunnerFactoryBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder, new TypeLiteral<Class<? extends Query>>()
{
}, TypeLiteral.get(QueryRunnerFactory.class)
binder, new TypeLiteral<Class<? extends Query>>(){}, TypeLiteral.get(QueryRunnerFactory.class)
);
}
public static MapBinder<Class<? extends Query>, QueryToolChest> queryToolChestBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder, new TypeLiteral<Class<? extends Query>>()
{
}, new TypeLiteral<QueryToolChest>()
{
}
binder, new TypeLiteral<Class<? extends Query>>(){}, new TypeLiteral<QueryToolChest>(){}
);
}
@ -55,4 +52,14 @@ public class DruidBinders
{
return MapBinder.newMapBinder(binder, String.class, DataSegmentPuller.class);
}
public static Multibinder<KeyHolder<DruidNode>> discoveryAnnouncementBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<KeyHolder<DruidNode>>(){});
}
public static Multibinder<Class<? extends Monitor>> metricMonitorBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<Class<? extends Monitor>>(){});
}
}

View File

@ -1,52 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPusher;
import io.druid.segment.loading.HdfsDataSegmentPusherConfig;
import org.apache.hadoop.conf.Configuration;
import javax.validation.constraints.NotNull;
/**
*/
public class HdfsDataSegmentPusherProvider implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = null;
@JacksonInject
@NotNull
private Configuration config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, config, jsonMapper);
}
}

View File

@ -19,22 +19,21 @@
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.druid.server.coordination.ServerManager;
import io.druid.server.coordination.ZkCoordinator;
/**
*/
public class HistoricalModule implements Module
public class NodeTypeConfig
{
private static final Logger log = new Logger(HistoricalModule.class);
private final String nodeType;
@Override
public void configure(Binder binder)
public NodeTypeConfig(
String nodeType
)
{
binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
this.nodeType = nodeType;
}
public String getNodeType()
{
return nodeType;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.util.Providers;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.QueryServlet;
@ -37,20 +38,11 @@ import java.util.List;
*/
public class QueryableModule implements DruidModule
{
private final Class<? extends QuerySegmentWalker> walkerClass;
public QueryableModule(
Class<? extends QuerySegmentWalker> walkerClass
)
{
this.walkerClass = walkerClass;
}
@Override
public void configure(Binder binder)
{
binder.bind(QueryServlet.class).in(LazySingleton.class);
binder.bind(QuerySegmentWalker.class).to(walkerClass).in(LazySingleton.class);
binder.bind(QuerySegmentWalker.class).toProvider(Providers.<QuerySegmentWalker>of(null));
binder.bind(RequestLogger.class).toProvider(RequestLoggerProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.request.logging", RequestLoggerProvider.class);
}

View File

@ -1,52 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.S3DataSegmentPusher;
import io.druid.segment.loading.S3DataSegmentPusherConfig;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import javax.validation.constraints.NotNull;
/**
*/
public class S3DataSegmentPusherProvider implements DataSegmentPusherProvider
{
@JacksonInject
@NotNull
private RestS3Service restS3Service = null;
@JacksonInject
@NotNull
private S3DataSegmentPusherConfig config = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public DataSegmentPusher get()
{
return new S3DataSegmentPusher(restS3Service, config, jsonMapper);
}
}

View File

@ -22,6 +22,8 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.util.Providers;
import io.druid.client.DruidServerConfig;
import io.druid.guice.annotations.Self;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
@ -32,23 +34,19 @@ import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import javax.annotation.Nullable;
/**
*/
public class StorageNodeModule implements Module
{
private final String nodeType;
public StorageNodeModule(String nodeType)
{
this.nodeType = nodeType;
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null));
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
binder.bind(QueryRunnerFactoryConglomerate.class)
@ -58,13 +56,17 @@ public class StorageNodeModule implements Module
@Provides
@LazySingleton
public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config)
public DruidServerMetadata getMetadata(@Self DruidNode node, @Nullable NodeTypeConfig nodeType, DruidServerConfig config)
{
if (nodeType == null) {
throw new ProvisionException("Must override the binding for NodeTypeConfig if you want a DruidServerMetadata.");
}
return new DruidServerMetadata(
node.getHost(),
node.getHost(),
config.getMaxSize(),
nodeType,
nodeType.getNodeType(),
config.getTier()
);
}

View File

@ -65,6 +65,11 @@ public class OmniSegmentLoader implements SegmentLoader
}
}
public OmniSegmentLoader withConfig(SegmentLoaderConfig config)
{
return new OmniSegmentLoader(pullers, factory, config);
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{

View File

@ -20,6 +20,7 @@
package io.druid.segment.loading;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import org.hibernate.validator.constraints.NotEmpty;
import java.io.File;
@ -58,6 +59,15 @@ public class SegmentLoaderConfig
return infoDir;
}
public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
{
SegmentLoaderConfig retVal = new SegmentLoaderConfig();
retVal.locations = Lists.newArrayList(locations);
retVal.deleteOnRemove = this.deleteOnRemove;
retVal.infoDir = this.infoDir;
return retVal;
}
@Override
public String toString()
{

View File

@ -42,11 +42,23 @@ public class StorageLocationConfig
return path;
}
public StorageLocationConfig setPath(File path)
{
this.path = path;
return this;
}
public long getMaxSize()
{
return maxSize;
}
public StorageLocationConfig setMaxSize(long maxSize)
{
this.maxSize = maxSize;
return this;
}
@Override
public String toString()
{

View File

@ -94,7 +94,7 @@ public class ServerManager implements QuerySegmentWalker
this.exec = exec;
this.dataSources = new HashMap<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>>();
this.dataSources = new HashMap<>();
}
public Map<String, Long> getDataSourceSizes()

View File

@ -20,7 +20,6 @@
package io.druid.server.initialization;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.ConfigurationException;
@ -44,13 +43,13 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.JSR311Resource;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.StatusResource;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.servlet.ServletException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -60,22 +59,6 @@ public class JettyServerModule extends JerseyServletModule
{
private static final Logger log = new Logger(JettyServerModule.class);
private final JettyServerInitializer initializer;
private final List<Class<?>> resources = Lists.newArrayList();
public JettyServerModule(
JettyServerInitializer initializer
)
{
this.initializer = initializer;
}
public JettyServerModule addResource(Class<?> resource)
{
resources.add(resource);
return this;
}
@Override
protected void configureServlets()
{
@ -87,10 +70,8 @@ public class JettyServerModule extends JerseyServletModule
binder.bind(DruidGuiceContainer.class).in(Scopes.SINGLETON);
serve("/*").with(DruidGuiceContainer.class);
for (Class<?> resource : resources) {
Jerseys.addResource(binder, resource);
binder.bind(resource).in(LazySingleton.class);
}
Jerseys.addResource(binder, StatusResource.class);
binder.bind(StatusResource.class).in(LazySingleton.class);
binder.bind(Key.get(Server.class, Names.named("ForTheEagerness"))).to(Server.class).asEagerSingleton();
}
@ -121,6 +102,8 @@ public class JettyServerModule extends JerseyServletModule
@Provides @LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config)
{
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
final Server server = makeJettyServer(node, config);
try {
initializer.initialize(server, injector);

View File

@ -19,11 +19,10 @@
package io.druid.server.metrics;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
@ -34,14 +33,12 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.concurrent.Execs;
import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.JsonConfigurator;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Set;
/**
* Sets up the {@link MonitorScheduler} to monitor things on a regular schedule. {@link Monitor}s must be explicitly
@ -51,40 +48,18 @@ public class MetricsModule implements Module
{
private static final Logger log = new Logger(MetricsModule.class);
private final List<Class<? extends Monitor>> monitors = new CopyOnWriteArrayList<Class<? extends Monitor>>();
public boolean configured = false;
public MetricsModule register(Class<? extends Monitor> monitorClazz)
public static void register(Binder binder, Class<? extends Monitor> monitorClazz)
{
synchronized (monitors) {
Preconditions.checkState(!configured, "Cannot register monitor[%s] after configuration.", monitorClazz);
}
monitors.add(monitorClazz);
return this;
}
@Inject
public void setProperties(Properties props, JsonConfigurator configurator)
{
final MonitorsConfig config = configurator.configurate(
props,
"druid.monitoring",
MonitorsConfig.class
);
for (Class<? extends Monitor> monitorClazz : config.getMonitors()) {
register(monitorClazz);
}
DruidBinders.metricMonitorBinder(binder).addBinding().toInstance(monitorClazz);
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.monitoring", DruidMonitorSchedulerConfig.class);
JsonConfigProvider.bind(binder, "druid.monitoring", MonitorsConfig.class);
for (Class<? extends Monitor> monitor : monitors) {
binder.bind(monitor).in(LazySingleton.class);
}
DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum.
// Instantiate eagerly so that we get everything registered and put into the Lifecycle
binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness")))
@ -96,21 +71,21 @@ public class MetricsModule implements Module
@ManageLifecycle
public MonitorScheduler getMonitorScheduler(
Supplier<DruidMonitorSchedulerConfig> config,
MonitorsConfig monitorsConfig,
Set<Class<? extends Monitor>> monitorSet,
ServiceEmitter emitter,
Injector injector
)
{
List<Monitor> monitors = Lists.newArrayList();
for (Key<?> key : injector.getBindings().keySet()) {
if (Monitor.class.isAssignableFrom(key.getTypeLiteral().getRawType())) {
final Monitor monitor = (Monitor) injector.getInstance(key);
for (Class<? extends Monitor> monitorClass : Iterables.concat(monitorsConfig.getMonitors(), monitorSet)) {
final Monitor monitor = injector.getInstance(monitorClass);
log.info("Adding monitor[%s]", monitor);
monitors.add(monitor);
}
}
return new MonitorScheduler(
config.get(),

View File

@ -20,24 +20,26 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.BrokerServerView;
import io.druid.client.CachingClusteredClient;
import io.druid.client.TimelineServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheMonitor;
import io.druid.curator.CuratorModule;
import io.druid.client.cache.CacheProvider;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.BrokerModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.QueryToolChestModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
import io.druid.guice.annotations.Client;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import java.util.List;
@ -61,20 +63,25 @@ public class CliBroker extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule().register(CacheMonitor.class),
new DiscoveryModule().register(Self.class),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ClientQuerySegmentWalker.class),
new QueryToolChestModule(),
new ServerViewModule(),
new HttpClientModule("druid.broker.http", Client.class),
new BrokerModule()
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
DiscoveryModule.register(binder, Self.class);
MetricsModule.register(binder, CacheMonitor.class);
}
}
);
}
}

View File

@ -19,41 +19,40 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.CoordinatorModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.JacksonConfigManagerModule;
import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseRuleManagerConfig;
import io.druid.db.DatabaseRuleManagerProvider;
import io.druid.db.DatabaseSegmentManager;
import io.druid.db.DatabaseSegmentManagerConfig;
import io.druid.db.DatabaseSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.StatusResource;
import io.druid.server.http.InfoResource;
import io.druid.server.http.MasterRedirectInfo;
import io.druid.server.http.MasterResource;
import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.http.RedirectInfo;
import io.druid.server.http.RedirectServlet;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.master.DruidMaster;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import io.druid.server.master.DruidMasterConfig;
import io.druid.server.master.LoadQueueTaskMaster;
import org.apache.curator.framework.CuratorFramework;
import java.util.List;
@ -76,44 +75,50 @@ public class CliCoordinator extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new LifecycleModule().register(DruidMaster.class),
EmitterModule.class,
HttpClientModule.global(),
DbConnectorModule.class,
JacksonConfigManagerModule.class,
CuratorModule.class,
new MetricsModule(),
new DiscoveryModule().register(Self.class),
new ServerModule(),
new JettyServerModule(new CoordinatorJettyServerInitializer())
.addResource(InfoResource.class)
.addResource(MasterResource.class)
.addResource(StatusResource.class),
new ServerViewModule(),
new IndexingServiceDiscoveryModule(),
CoordinatorModule.class
);
}
private static class CoordinatorJettyServerInitializer implements JettyServerInitializer
new Module()
{
@Override
public void initialize(Server server, Injector injector)
public void configure(Binder binder)
{
ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setResourceBase(DruidMaster.class.getClassLoader().getResource("static").toExternalForm());
ConfigProvider.bind(binder, DruidMasterConfig.class);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList);
binder.bind(RedirectServlet.class).in(LazySingleton.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(DatabaseRuleManager.class)
.toProvider(DatabaseRuleManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class);
binder.bind(DruidMaster.class);
LifecycleModule.register(binder, DruidMaster.class);
DiscoveryModule.register(binder, Self.class);
binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer());
Jerseys.addResource(binder, InfoResource.class);
Jerseys.addResource(binder, MasterResource.class);
}
@Provides
@LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config
)
{
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
}
}
);
}
}

View File

@ -20,25 +20,18 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPullerModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.HistoricalModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
import io.druid.guice.StorageNodeModule;
import io.druid.server.StatusResource;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NodeTypeConfig;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.coordination.ServerManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import io.druid.server.metrics.ServerMonitor;
@ -63,22 +56,21 @@ public class CliHistorical extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new LifecycleModule().register(ZkCoordinator.class),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
AnnouncerModule.class,
DruidProcessingModule.class,
AWSModule.class,
DataSegmentPullerModule.class,
new MetricsModule().register(ServerMonitor.class),
new ServerModule(),
new StorageNodeModule("historical"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(ServerManager.class),
new QueryRunnerFactoryModule(),
HistoricalModule.class
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
LifecycleModule.register(binder, ZkCoordinator.class);
MetricsModule.register(binder, ServerMonitor.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
}
}
);
}
}

View File

@ -20,32 +20,27 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
import io.druid.guice.AWSModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.MiddleManagerModule;
import io.druid.guice.ServerModule;
import io.druid.guice.TaskLogsModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.indexing.coordinator.ForkingTaskRunner;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.indexing.worker.WorkerTaskMonitor;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.indexing.worker.http.WorkerResource;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.DruidNode;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import java.util.List;
@ -68,34 +63,38 @@ public class CliMiddleManager extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new LifecycleModule().register(WorkerTaskMonitor.class),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new ServerModule(),
new JettyServerModule(new MiddleManagerJettyServerInitializer())
.addResource(StatusResource.class)
.addResource(WorkerResource.class),
new AWSModule(),
new TaskLogsModule(),
new MiddleManagerModule()
);
}
private static class MiddleManagerJettyServerInitializer implements JettyServerInitializer
new Module()
{
@Override
public void initialize(Server server, Injector injector)
public void configure(Binder binder)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
server.setHandler(handlerList);
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, WorkerTaskMonitor.class);
binder.bind(JettyServerInitializer.class).toInstance(new MiddleManagerJettyServerInitializer());
Jerseys.addResource(binder, WorkerResource.class);
}
@Provides
@LazySingleton
public Worker getWorker(@Self DruidNode node, WorkerConfig config)
{
return new Worker(
node.getHost(),
config.getIp(),
config.getCapacity(),
config.getVersion()
);
}
}
);
}
}

View File

@ -20,28 +20,56 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.JacksonConfigManagerModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.OverlordModule;
import io.druid.guice.ServerModule;
import io.druid.guice.TaskLogsModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.JacksonConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ListProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.PolyBind;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskLogs;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.coordinator.DbTaskStorage;
import io.druid.indexing.coordinator.ForkingTaskRunnerFactory;
import io.druid.indexing.coordinator.HeapMemoryTaskStorage;
import io.druid.indexing.coordinator.IndexerDBCoordinator;
import io.druid.indexing.coordinator.RemoteTaskRunnerFactory;
import io.druid.indexing.coordinator.TaskLockbox;
import io.druid.indexing.coordinator.TaskMaster;
import io.druid.indexing.coordinator.TaskQueue;
import io.druid.indexing.coordinator.TaskRunnerFactory;
import io.druid.indexing.coordinator.TaskStorage;
import io.druid.indexing.coordinator.TaskStorageQueryAdapter;
import io.druid.indexing.coordinator.http.IndexerCoordinatorResource;
import io.druid.server.StatusResource;
import io.druid.indexing.coordinator.http.OverlordRedirectInfo;
import io.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy;
import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.coordinator.setup.WorkerSetupData;
import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
@ -75,24 +103,103 @@ public class CliOverlord extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new ServerModule(),
new AWSModule(),
new DbConnectorModule(),
new JacksonConfigManagerModule(),
new JettyServerModule(new OverlordJettyServerInitializer())
.addResource(IndexerCoordinatorResource.class)
.addResource(StatusResource.class),
new DiscoveryModule(),
new TaskLogsModule(),
new OverlordModule()
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TaskLogStreamer>>(){})
.toProvider(
new ListProvider<TaskLogStreamer>()
.add(TaskRunnerTaskLogStreamer.class)
.add(TaskLogs.class)
)
.in(LazySingleton.class);
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskQueue.class).in(LazySingleton.class); // Lifecycle managed by TaskMaster instead
binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)
.to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class);
configureTaskStorage(binder);
configureRunners(binder);
configureAutoscale(binder);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
Jerseys.addResource(binder, IndexerCoordinatorResource.class);
}
private void configureTaskStorage(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class)
);
final MapBinder<String, TaskStorage> storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class));
storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class);
binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class);
storageBinder.addBinding("db").to(DbTaskStorage.class);
binder.bind(DbTaskStorage.class).in(LazySingleton.class);
}
private void configureRunners(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class)
);
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class));
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
}
private void configureAutoscale(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy",
Key.get(AutoScalingStrategy.class),
Key.get(NoopAutoScalingStrategy.class)
);
final MapBinder<String, AutoScalingStrategy> autoScalingBinder = PolyBind.optionBinder(
binder, Key.get(AutoScalingStrategy.class)
);
autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class);
binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class);
autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class);
binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
}
);
}
/**
*/
private static class OverlordJettyServerInitializer implements JettyServerInitializer
{
@Override

View File

@ -21,41 +21,47 @@ package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPullerModule;
import io.druid.guice.DataSegmentPusherModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.PeonModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
import io.druid.guice.StorageNodeModule;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NodeTypeConfig;
import io.druid.guice.PolyBind;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.coordinator.TaskRunner;
import io.druid.indexing.coordinator.ThreadPoolTaskRunner;
import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.initialization.LogLevelAdjuster;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.S3DataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.JettyServerInitializer;
import java.io.File;
import java.util.Arrays;
import java.util.List;
/**
@ -87,32 +93,59 @@ public class CliPeon implements Runnable
{
return Initialization.makeInjectorWithModules(
injector,
ImmutableList.of(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class)
.addResource(ChatHandlerResource.class),
new DiscoveryModule(),
new ServerViewModule(),
new StorageNodeModule(nodeType),
new DataSegmentPullerModule(),
new DataSegmentPusherModule(),
new AnnouncerModule(),
new DruidProcessingModule(),
new QueryableModule(ThreadPoolTaskRunner.class),
new QueryRunnerFactoryModule(),
new IndexingServiceDiscoveryModule(),
new AWSModule(),
new PeonModule(
ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class);
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class);
binder.bind(TaskActionClientFactory.class)
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskAndStatusFile.get(0)))
.setStatusFile(new File(taskAndStatusFile.get(1)))
)
);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
// Override the default SegmentLoaderConfig because we don't actually care about the
// configuration based locations. This will override them anyway. This is also stopping
// configuration of other parameters, but I don't think that's actually a problem.
// Note, if that is actually not a problem, then that probably means we have the wrong abstraction.
binder.bind(SegmentLoaderConfig.class)
.toInstance(new SegmentLoaderConfig().withLocations(Arrays.<StorageLocationConfig>asList()));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, ChatHandlerResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
}
}
)
);
}

View File

@ -22,25 +22,7 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.curator.CuratorModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPusherModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.RealtimeModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
import io.druid.guice.StorageNodeModule;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import java.util.List;
@ -63,24 +45,7 @@ public class CliRealtime extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new LifecycleModule(),
EmitterModule.class,
DbConnectorModule.class,
HttpClientModule.global(),
CuratorModule.class,
AnnouncerModule.class,
DruidProcessingModule.class,
AWSModule.class,
DataSegmentPusherModule.class,
new MetricsModule(),
new ServerModule(),
new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new ServerViewModule(),
new QueryableModule(RealtimeManager.class),
new QueryRunnerFactoryModule(),
RealtimeModule.class
new RealtimeModule()
);
}
}

View File

@ -19,22 +19,33 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.metamx.common.logger.Logger;
import druid.examples.guice.RealtimeExampleModule;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.airlift.command.Command;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
import io.druid.guice.StorageNodeModule;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.server.StatusResource;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.JettyServerModule;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.RealtimeModule;
import io.druid.initialization.DruidModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
/**
*/
@ -54,17 +65,104 @@ public class CliRealtimeExample extends ServerRunnable
@Override
protected List<Object> getModules()
{
return ImmutableList.of(
new LifecycleModule(),
EmitterModule.class,
DruidProcessingModule.class,
new ServerModule(),
new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),
new QueryableModule(RealtimeManager.class),
new QueryRunnerFactoryModule(),
RealtimeExampleModule.class
return ImmutableList.<Object>of(
new RealtimeModule(),
new DruidModule()
{
@Override
public void configure(Binder binder)
{
binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class);
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class);
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class);
binder.bind(InventoryView.class).to(NoopInventoryView.class);
binder.bind(ServerView.class).to(NoopServerView.class);
}
@Override
public List<Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("RealtimeExampleModule")
.registerSubtypes(
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
new NamedType(FlightsFirehoseFactory.class, "flights"),
new NamedType(RandomFirehoseFactory.class, "rand"),
new NamedType(WebFirehoseFactory.class, "webstream")
)
);
}
}
);
}
private static class NoopServerView implements ServerView
{
@Override
public void registerServerCallback(
Executor exec, ServerCallback callback
)
{
// do nothing
}
@Override
public void registerSegmentCallback(
Executor exec, SegmentCallback callback
)
{
// do nothing
}
}
private static class NoopInventoryView implements InventoryView
{
@Override
public DruidServer getInventoryValue(String string)
{
return null;
}
@Override
public Iterable<DruidServer> getInventory()
{
return ImmutableList.of();
}
}
private static class NoopDataSegmentPusher implements DataSegmentPusher
{
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
return segment;
}
}
private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@Override
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
// do nothing
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.master.DruidMaster;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
class CoordinatorJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setResourceBase(DruidMaster.class.getClassLoader().getResource("static").toExternalForm());
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.initialization;
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
@ -29,12 +29,35 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPusherPullerModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.JacksonConfigManagerModule;
import io.druid.guice.LifecycleModule;
import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
import io.druid.guice.StorageNodeModule;
import io.druid.guice.TaskLogsModule;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.initialization.DruidModule;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import io.tesla.aether.TeslaAether;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.eclipse.aether.artifact.Artifact;
@ -171,8 +194,33 @@ public class Initialization
}
}
public static Injector makeInjectorWithModules(final Injector baseInjector, List<Object> modules)
public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable<Object> modules)
{
final ModuleList defaultModules = new ModuleList(baseInjector);
defaultModules.addModules(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
new HttpClientModule("druid.broker.http", Client.class),
new CuratorModule(),
new AnnouncerModule(),
new DruidProcessingModule(),
new AWSModule(),
new MetricsModule(),
new ServerModule(),
new StorageNodeModule(),
new JettyServerModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new DiscoveryModule(),
new ServerViewModule(),
new DbConnectorModule(),
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new DataSegmentPusherPullerModule(),
new TaskLogsModule()
);
ModuleList actualModules = new ModuleList(baseInjector);
actualModules.addModule(DruidSecondaryModule.class);
for (Object module : modules) {
@ -184,7 +232,7 @@ public class Initialization
actualModules.addModule(module);
}
return Guice.createInjector(actualModules.getModules());
return Guice.createInjector(Modules.override(defaultModules.getModules()).with(actualModules.getModules()));
}
private static class ModuleList
@ -233,6 +281,13 @@ public class Initialization
}
}
public void addModules(Object... object)
{
for (Object o : object) {
addModule(o);
}
}
private DruidModule registerJacksonModules(DruidModule module)
{
for (com.fasterxml.jackson.databind.Module jacksonModule : module.getJacksonModules()) {

View File

@ -33,7 +33,6 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.jackson.JacksonModule;
import io.druid.server.initialization.ConfigModule;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.server.initialization.Initialization;
import io.druid.server.initialization.PropertiesModule;
import java.util.List;

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
class MiddleManagerJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
server.setHandler(handlerList);
}
}

View File

@ -25,7 +25,6 @@ import com.google.inject.Injector;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.initialization.LogLevelAdjuster;
import io.druid.server.initialization.Initialization;
import java.util.List;
@ -56,7 +55,9 @@ public abstract class ServerRunnable implements Runnable
try {
LogLevelAdjuster.register();
final Injector injector = Initialization.makeInjectorWithModules(baseInjector, getModules());
final Injector injector = Initialization.makeInjectorWithModules(
baseInjector, getModules()
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {

View File

@ -98,7 +98,11 @@ public class ConvertProperties implements Runnable
new Rename("druid.indexer.rowFlushBoundary", "druid.indexer.task.rowFlushBoundary"),
new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"),
new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"),
new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName")
new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"),
new DataSegmentPusherDefaultConverter(),
new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"),
new Rename("druid.pusher.cassandra.host", "druid.pusher.host"),
new Rename("druid.pusher.cassandra.keySpace", "druid.pusher.keySpace")
);
@Option(name = "-f", title = "file", description = "The properties file to convert", required = true)
@ -150,6 +154,10 @@ public class ConvertProperties implements Runnable
}
}
updatedProps.setProperty(
"druid.monitoring.monitors", "[\"io.druid.server.metrics.ServerMonitor\", \"com.metamx.metrics.SysMonitor\"]"
);
try (Writer out = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8))
{
updatedProps.store(out, null);

View File

@ -0,0 +1,70 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli.convert;
import com.google.api.client.util.Maps;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
*/
public class DataSegmentPusherDefaultConverter implements PropertyConverter
{
Set<String> propertiesHandled = Sets.newHashSet("druid.pusher.local", "druid.pusher.cassandra", "druid.pusher.hdfs");
@Override
public boolean canHandle(String property)
{
return propertiesHandled.contains(property) || property.startsWith("druid.pusher.s3");
}
@Override
public Map<String, String> convert(Properties props)
{
String type = null;
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
type = "local";
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) {
type = "c*";
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) {
type = "hdfs";
}
if (type != null) {
return ImmutableMap.of("druid.pusher.type", type);
}
// It's an s3 property, which means we need to set the type and convert the other values.
Map<String, String> retVal = Maps.newHashMap();
retVal.put("druid.pusher.type", type);
retVal.putAll(new Rename("druid.pusher.s3.bucket", "druid.pusher.bucket").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.baseKey", "druid.pusher.baseKey").convert(props));
retVal.putAll(new Rename("druid.pusher.s3.disableAcl", "druid.pusher.disableAcl").convert(props));
return retVal;
}
}

View File

@ -19,27 +19,27 @@
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.initialization.DruidModule;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.server.initialization.JettyServerInitializer;
import java.util.Arrays;
import java.util.List;
/**
*/
*/
public class RealtimeModule implements DruidModule
{
private static final Logger log = new Logger(RealtimeModule.class);
@Override
public void configure(Binder binder)
{
@ -47,18 +47,19 @@ public class RealtimeModule implements DruidModule
binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){})
.toProvider(FireDepartmentsProvider.class)
.in(LazySingleton.class);
binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
public List<? extends Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
return Arrays.<Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2")