diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml new file mode 100644 index 00000000000..6970489ebeb --- /dev/null +++ b/cassandra-storage/pom.xml @@ -0,0 +1,53 @@ + + + + + 4.0.0 + com.metamx.druid + druid-cassandra-storage + druid-cassandra-storage + druid-cassandra-storage + + + com.metamx + druid + 0.6.0-SNAPSHOT + + + + + com.metamx.druid + druid-server + ${project.parent.version} + + + com.netflix.astyanax + astyanax + 1.0.1 + + + + + junit + junit + test + + + diff --git a/server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentConfig.java b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentConfig.java similarity index 86% rename from server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentConfig.java rename to cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentConfig.java index d9551a78782..2a8fdafab84 100644 --- a/server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentConfig.java +++ b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentConfig.java @@ -21,18 +21,22 @@ package io.druid.segment.loading.cassandra; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.validation.constraints.NotNull; + /** * Cassandra Config * * @author boneill42 */ -public abstract class CassandraDataSegmentConfig +public class CassandraDataSegmentConfig { @JsonProperty - public String host = ""; + @NotNull + public String host = null; @JsonProperty - public String keyspace = ""; + @NotNull + public String keyspace = null; public String getKeyspace() { diff --git a/server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPuller.java b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPuller.java similarity index 100% rename from server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPuller.java rename to cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPuller.java diff --git a/server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPusher.java b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPusher.java similarity index 100% rename from server/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPusher.java rename to cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDataSegmentPusher.java diff --git a/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDruidModule.java b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDruidModule.java new file mode 100644 index 00000000000..e50e748110a --- /dev/null +++ b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraDruidModule.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading.cassandra; + +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Key; +import io.druid.guice.DruidBinders; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.PolyBind; +import io.druid.initialization.DruidModule; +import io.druid.segment.loading.DataSegmentPusher; + +import java.util.List; + +/** + */ +public class CassandraDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of(); + } + + @Override + public void configure(Binder binder) + { + DruidBinders.dataSegmentPullerBinder(binder) + .addBinding("c*") + .to(CassandraDataSegmentPuller.class) + .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) + .addBinding("c*") + .to(CassandraDataSegmentPusher.class) + .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.pusher", CassandraDataSegmentConfig.class); + } +} diff --git a/server/src/main/java/io/druid/segment/loading/cassandra/CassandraStorage.java b/cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraStorage.java similarity index 100% rename from server/src/main/java/io/druid/segment/loading/cassandra/CassandraStorage.java rename to cassandra-storage/src/main/java/io/druid/segment/loading/cassandra/CassandraStorage.java diff --git a/cassandra-storage/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/cassandra-storage/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..ade20b5ba28 --- /dev/null +++ b/cassandra-storage/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.segment.loading.cassandra.CassandraDruidModule \ No newline at end of file diff --git a/common/src/main/java/io/druid/guice/DruidSecondaryModule.java b/common/src/main/java/io/druid/guice/DruidSecondaryModule.java index 7ace75f97a6..fbafb29d42b 100644 --- a/common/src/main/java/io/druid/guice/DruidSecondaryModule.java +++ b/common/src/main/java/io/druid/guice/DruidSecondaryModule.java @@ -68,7 +68,6 @@ public class DruidSecondaryModule implements Module @Override public void configure(Binder binder) { - binder.requireExplicitBindings(); binder.install(new DruidGuiceExtensions()); binder.bind(Properties.class).toInstance(properties); binder.bind(ConfigurationObjectFactory.class).toInstance(factory); diff --git a/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java b/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java deleted file mode 100644 index 816a8d367da..00000000000 --- a/examples/src/main/java/druid/examples/guice/RealtimeExampleModule.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package druid.examples.guice; - -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.TypeLiteral; -import com.metamx.common.guava.LazySequence; -import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; -import io.druid.client.DruidServer; -import io.druid.client.InventoryView; -import io.druid.client.ServerView; -import io.druid.guice.FireDepartmentsProvider; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.LazySingleton; -import io.druid.guice.ManageLifecycle; -import io.druid.guice.NoopSegmentPublisherProvider; -import io.druid.guice.RealtimeManagerConfig; -import io.druid.initialization.DruidModule; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.RealtimeManager; -import io.druid.segment.realtime.SegmentPublisher; -import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.timeline.DataSegment; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Executor; - -/** - */ -public class RealtimeExampleModule implements DruidModule -{ - private static final Logger log = new Logger(RealtimeExampleModule.class); - - @Override - public void configure(Binder binder) - { - binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class); - binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class); - binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class); - binder.bind(InventoryView.class).to(NoopInventoryView.class); - binder.bind(ServerView.class).to(NoopServerView.class); - - JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind( - new TypeLiteral>() - { - } - ).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class); - binder.bind(RealtimeManager.class).in(ManageLifecycle.class); - } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeExampleModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream") - ) - ); - } - - private static class NoopServerView implements ServerView - { - @Override - public void registerServerCallback( - Executor exec, ServerCallback callback - ) - { - // do nothing - } - - @Override - public void registerSegmentCallback( - Executor exec, SegmentCallback callback - ) - { - // do nothing - } - } - - private static class NoopInventoryView implements InventoryView - { - @Override - public DruidServer getInventoryValue(String string) - { - return null; - } - - @Override - public Iterable getInventory() - { - return ImmutableList.of(); - } - } - - private static class NoopDataSegmentPusher implements DataSegmentPusher - { - @Override - public DataSegment push(File file, DataSegment segment) throws IOException - { - return segment; - } - } - - private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer - { - @Override - public void announceSegment(DataSegment segment) throws IOException - { - // do nothing - } - - @Override - public void unannounceSegment(DataSegment segment) throws IOException - { - // do nothing - } - - @Override - public void announceSegments(Iterable segments) throws IOException - { - // do nothing - } - - @Override - public void unannounceSegments(Iterable segments) throws IOException - { - // do nothing - } - } -} diff --git a/indexing-service/src/main/java/io/druid/guice/MiddleManagerModule.java b/indexing-service/src/main/java/io/druid/guice/MiddleManagerModule.java deleted file mode 100644 index ed8adb394a2..00000000000 --- a/indexing-service/src/main/java/io/druid/guice/MiddleManagerModule.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.Provides; -import io.druid.guice.annotations.Self; -import io.druid.indexing.coordinator.ForkingTaskRunner; -import io.druid.indexing.coordinator.TaskRunner; -import io.druid.indexing.worker.Worker; -import io.druid.indexing.worker.WorkerCuratorCoordinator; -import io.druid.indexing.worker.WorkerTaskMonitor; -import io.druid.indexing.worker.config.WorkerConfig; -import io.druid.server.DruidNode; - -/** - */ -public class MiddleManagerModule implements Module -{ - @Override - public void configure(Binder binder) - { - IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); - - JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class); - - binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); - binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); - - binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); - binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); - } - - @Provides @LazySingleton - public Worker getWorker(@Self DruidNode node, WorkerConfig config) - { - return new Worker( - node.getHost(), - config.getIp(), - config.getCapacity(), - config.getVersion() - ); - } -} diff --git a/indexing-service/src/main/java/io/druid/guice/OverlordModule.java b/indexing-service/src/main/java/io/druid/guice/OverlordModule.java deleted file mode 100644 index 09d5d48c313..00000000000 --- a/indexing-service/src/main/java/io/druid/guice/OverlordModule.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.google.inject.Binder; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.TypeLiteral; -import com.google.inject.multibindings.MapBinder; -import io.druid.indexing.common.actions.LocalTaskActionClientFactory; -import io.druid.indexing.common.actions.TaskActionClientFactory; -import io.druid.indexing.common.actions.TaskActionToolbox; -import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; -import io.druid.indexing.common.tasklogs.TaskLogStreamer; -import io.druid.indexing.common.tasklogs.TaskLogs; -import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; -import io.druid.indexing.coordinator.DbTaskStorage; -import io.druid.indexing.coordinator.ForkingTaskRunnerFactory; -import io.druid.indexing.coordinator.HeapMemoryTaskStorage; -import io.druid.indexing.coordinator.IndexerDBCoordinator; -import io.druid.indexing.coordinator.RemoteTaskRunnerFactory; -import io.druid.indexing.coordinator.TaskLockbox; -import io.druid.indexing.coordinator.TaskMaster; -import io.druid.indexing.coordinator.TaskQueue; -import io.druid.indexing.coordinator.TaskRunnerFactory; -import io.druid.indexing.coordinator.TaskStorage; -import io.druid.indexing.coordinator.TaskStorageQueryAdapter; -import io.druid.indexing.coordinator.http.OverlordRedirectInfo; -import io.druid.indexing.coordinator.scaling.AutoScalingStrategy; -import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; -import io.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy; -import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; -import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; -import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactoryImpl; -import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy; -import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; -import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; -import io.druid.indexing.coordinator.setup.WorkerSetupData; -import io.druid.server.http.RedirectFilter; -import io.druid.server.http.RedirectInfo; - -import java.util.List; - -/** - */ -public class OverlordModule implements Module -{ - @Override - public void configure(Binder binder) - { - binder.bind(TaskMaster.class).in(ManageLifecycle.class); - - binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); - binder.bind(new TypeLiteral>(){}) - .toProvider( - new ListProvider() - .add(TaskRunnerTaskLogStreamer.class) - .add(TaskLogs.class) - ) - .in(LazySingleton.class); - - binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); - binder.bind(TaskActionToolbox.class).in(LazySingleton.class); - binder.bind(TaskQueue.class).in(LazySingleton.class); // Lifecycle managed by TaskMaster instead - binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class); - binder.bind(TaskLockbox.class).in(LazySingleton.class); - binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); - binder.bind(ResourceManagementSchedulerFactory.class) - .to(ResourceManagementSchedulerFactoryImpl.class) - .in(LazySingleton.class); - - configureTaskStorage(binder); - configureRunners(binder); - configureAutoscale(binder); - - binder.bind(RedirectFilter.class).in(LazySingleton.class); - binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class); - } - - private void configureTaskStorage(Binder binder) - { - PolyBind.createChoice( - binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) - ); - final MapBinder storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class)); - - storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class); - binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class); - - storageBinder.addBinding("db").to(DbTaskStorage.class); - binder.bind(DbTaskStorage.class).in(LazySingleton.class); - } - - private void configureRunners(Binder binder) - { - PolyBind.createChoice( - binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) - ); - final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); - - IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); - biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); - binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class); - - biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); - binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); - } - - private void configureAutoscale(Binder binder) - { - JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); - binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class); - - JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); - - PolyBind.createChoice( - binder, - "druid.indexer.autoscale.strategy", - Key.get(AutoScalingStrategy.class), - Key.get(NoopAutoScalingStrategy.class) - ); - - final MapBinder autoScalingBinder = PolyBind.optionBinder( - binder, Key.get(AutoScalingStrategy.class) - ); - autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class); - binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class); - - autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class); - binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class); - - JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); - } -} diff --git a/indexing-service/src/main/java/io/druid/guice/PeonModule.java b/indexing-service/src/main/java/io/druid/guice/PeonModule.java deleted file mode 100644 index fafb7d7094a..00000000000 --- a/indexing-service/src/main/java/io/druid/guice/PeonModule.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.google.inject.Binder; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.multibindings.MapBinder; -import io.druid.indexing.common.RetryPolicyConfig; -import io.druid.indexing.common.RetryPolicyFactory; -import io.druid.indexing.common.TaskToolboxFactory; -import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; -import io.druid.indexing.common.actions.TaskActionClientFactory; -import io.druid.indexing.common.config.TaskConfig; -import io.druid.indexing.common.index.ChatHandlerProvider; -import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; -import io.druid.indexing.common.index.NoopChatHandlerProvider; -import io.druid.indexing.coordinator.TaskRunner; -import io.druid.indexing.coordinator.ThreadPoolTaskRunner; -import io.druid.indexing.worker.executor.ExecutorLifecycle; -import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; -import io.druid.segment.loading.DataSegmentKiller; -import io.druid.segment.loading.S3DataSegmentKiller; - -/** - */ -public class PeonModule implements Module -{ - private final ExecutorLifecycleConfig config; - - public PeonModule( - ExecutorLifecycleConfig config - ) - { - this.config = config; - } - - @Override - public void configure(Binder binder) - { - PolyBind.createChoice( - binder, - "druid.indexer.task.chathandler.type", - Key.get(ChatHandlerProvider.class), - Key.get(NoopChatHandlerProvider.class) - ); - final MapBinder handlerProviderBinder = PolyBind.optionBinder( - binder, Key.get(ChatHandlerProvider.class) - ); - handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); - handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); - - binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); - - JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); - JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); - - binder.bind(TaskActionClientFactory.class).to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); - binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); - - binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); - - binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); - binder.bind(ExecutorLifecycleConfig.class).toInstance(config); - - binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class).in(LazySingleton.class); - binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); - } -} diff --git a/server/src/main/java/io/druid/guice/LocalDataSegmentPusherProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java similarity index 54% rename from server/src/main/java/io/druid/guice/LocalDataSegmentPusherProvider.java rename to indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java index 3af8510dd1d..d6932bc97c9 100644 --- a/server/src/main/java/io/druid/guice/LocalDataSegmentPusherProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/SegmentLoaderFactory.java @@ -17,27 +17,35 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.guice; +package io.druid.indexing.common; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.LocalDataSegmentPusher; -import io.druid.segment.loading.LocalDataSegmentPusherConfig; +import com.google.inject.Inject; +import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoader; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.StorageLocationConfig; -import javax.validation.constraints.NotNull; +import java.io.File; +import java.util.Arrays; /** */ -public class LocalDataSegmentPusherProvider extends LocalDataSegmentPusherConfig implements DataSegmentPusherProvider +public class SegmentLoaderFactory { - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; + private final OmniSegmentLoader loader; - @Override - public DataSegmentPusher get() + @Inject + public SegmentLoaderFactory( + OmniSegmentLoader loader + ) { - return new LocalDataSegmentPusher(this, jsonMapper); + this.loader = loader; + } + + public SegmentLoader manufacturate(File storageDir) + { + return loader.withConfig( + new SegmentLoaderConfig().withLocations(Arrays.asList(new StorageLocationConfig().setPath(storageDir))) + ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 51e0d227d21..0a7a505d4ec 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -59,6 +59,7 @@ public class TaskToolbox private final ExecutorService queryExecutorService; private final SegmentLoader segmentLoader; private final ObjectMapper objectMapper; + private final File taskWorkDir; public TaskToolbox( TaskConfig config, @@ -73,7 +74,8 @@ public class TaskToolbox ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, SegmentLoader segmentLoader, - ObjectMapper objectMapper + ObjectMapper objectMapper, + final File taskWorkDir ) { this.config = config; @@ -89,6 +91,7 @@ public class TaskToolbox this.monitorScheduler = monitorScheduler; this.segmentLoader = segmentLoader; this.objectMapper = objectMapper; + this.taskWorkDir = taskWorkDir; } public TaskConfig getConfig() @@ -159,6 +162,6 @@ public class TaskToolbox public File getTaskWorkDir() { - return new File(new File(config.getBaseTaskDir(), task.getId()), "work"); + return taskWorkDir; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 0cac5acbce7..d20480b9de0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -31,9 +31,9 @@ import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.SegmentLoader; import io.druid.server.coordination.DataSegmentAnnouncer; +import java.io.File; import java.util.concurrent.ExecutorService; /** @@ -51,7 +51,7 @@ public class TaskToolboxFactory private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ExecutorService queryExecutorService; private final MonitorScheduler monitorScheduler; - private final SegmentLoader segmentLoader; + private final SegmentLoaderFactory segmentLoaderFactory; private final ObjectMapper objectMapper; @Inject @@ -66,7 +66,7 @@ public class TaskToolboxFactory QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @Processing ExecutorService queryExecutorService, MonitorScheduler monitorScheduler, - SegmentLoader segmentLoader, + SegmentLoaderFactory segmentLoaderFactory, ObjectMapper objectMapper ) { @@ -80,12 +80,14 @@ public class TaskToolboxFactory this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; this.monitorScheduler = monitorScheduler; - this.segmentLoader = segmentLoader; + this.segmentLoaderFactory = segmentLoaderFactory; this.objectMapper = objectMapper; } public TaskToolbox build(Task task) { + final File taskWorkDir = new File(new File(config.getBaseTaskDir(), task.getId()), "work"); + return new TaskToolbox( config, task, @@ -98,8 +100,9 @@ public class TaskToolboxFactory queryRunnerFactoryConglomerate, queryExecutorService, monitorScheduler, - segmentLoader, - objectMapper + segmentLoaderFactory.manufacturate(taskWorkDir), + objectMapper, + taskWorkDir ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index d6c2fc81bc3..230f074db72 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -56,8 +56,12 @@ public class NoopTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { + final int sleepTime = 2500; + log.info("Running noop task[%s]", getId()); - Thread.sleep(2500); + log.info("Sleeping for %,d millis.", sleepTime); + Thread.sleep(sleepTime); + log.info("Woke up!"); return TaskStatus.success(getId()); } } diff --git a/pom.xml b/pom.xml index f416946ff99..6396ebc8be2 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ server services processing + cassandra-storage diff --git a/server/pom.xml b/server/pom.xml index 9d8b333e50d..1259afa949a 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -199,11 +199,6 @@ java-xmlbuilder true - - com.netflix.astyanax - astyanax - 1.0.1 - org.antlr antlr4-runtime diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 20d52e8e77f..3f577f2d64f 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -19,10 +19,8 @@ package io.druid.curator.discovery; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Injector; @@ -33,18 +31,30 @@ import com.google.inject.TypeLiteral; import com.google.inject.name.Named; import com.google.inject.name.Names; import com.metamx.common.lifecycle.Lifecycle; +import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.KeyHolder; import io.druid.guice.LazySingleton; import io.druid.server.DruidNode; import io.druid.server.initialization.CuratorDiscoveryConfig; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.discovery.ProviderStrategy; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceCacheBuilder; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; +import org.apache.curator.x.discovery.ServiceProviderBuilder; +import org.apache.curator.x.discovery.details.ServiceCacheListener; -import javax.annotation.Nullable; +import java.io.IOException; import java.lang.annotation.Annotation; +import java.util.Collection; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; /** * The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be @@ -57,21 +67,16 @@ public class DiscoveryModule implements Module { private static final String NAME = "DiscoveryModule:internal"; - public final List>> nodesToAnnounce = new CopyOnWriteArrayList>>(); - public boolean configured = false; - /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. * * That is, this module will announce the DruidNode instance returned by * injector.getInstance(Key.get(DruidNode.class)) automatically. * Announcement will happen in the LAST stage of the Lifecycle - * - * @return this, for chaining. */ - public DiscoveryModule registerDefault() + public static void registerDefault(Binder binder) { - return registerKey(Key.get(new TypeLiteral>(){})); + registerKey(binder, Key.get(new TypeLiteral(){})); } /** @@ -82,11 +87,10 @@ public class DiscoveryModule implements Module * Announcement will happen in the LAST stage of the Lifecycle * * @param annotation The annotation instance to use in finding the DruidNode instance, usually a Named annotation - * @return this, for chaining. */ - public DiscoveryModule register(Annotation annotation) + public static void register(Binder binder, Annotation annotation) { - return registerKey(Key.get(new TypeLiteral>(){}, annotation)); + registerKey(binder, Key.get(new TypeLiteral(){}, annotation)); } /** @@ -97,11 +101,10 @@ public class DiscoveryModule implements Module * Announcement will happen in the LAST stage of the Lifecycle * * @param annotation The annotation class to use in finding the DruidNode instance - * @return this, for chaining */ - public DiscoveryModule register(Class annotation) + public static void register(Binder binder, Class annotation) { - return registerKey(Key.get(new TypeLiteral>(){}, annotation)); + registerKey(binder, Key.get(new TypeLiteral(){}, annotation)); } /** @@ -112,67 +115,53 @@ public class DiscoveryModule implements Module * Announcement will happen in the LAST stage of the Lifecycle * * @param key The key to use in finding the DruidNode instance - * @return this, for chaining */ - public DiscoveryModule registerKey(Key> key) + public static void registerKey(Binder binder, Key key) { - synchronized (nodesToAnnounce) { - Preconditions.checkState(!configured, "Cannot register key[%s] after configuration.", key); - } - nodesToAnnounce.add(key); - return this; + DruidBinders.discoveryAnnouncementBinder(binder).addBinding().toInstance(new KeyHolder<>(key)); } @Override public void configure(Binder binder) { - synchronized (nodesToAnnounce) { - configured = true; - JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class); + JsonConfigProvider.bind(binder, "druid.discovery.curator", CuratorDiscoveryConfig.class); - binder.bind(CuratorServiceAnnouncer.class).in(LazySingleton.class); + binder.bind(CuratorServiceAnnouncer.class).in(LazySingleton.class); - // We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect - binder.bind(ServiceAnnouncer.class) - .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) - .asEagerSingleton(); - } + // Build the binder so that it will at a minimum inject an empty set. + DruidBinders.discoveryAnnouncementBinder(binder); + + // We bind this eagerly so that it gets instantiated and registers stuff with Lifecycle as a side-effect + binder.bind(ServiceAnnouncer.class) + .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) + .asEagerSingleton(); } @Provides @LazySingleton @Named(NAME) public CuratorServiceAnnouncer getServiceAnnouncer( final CuratorServiceAnnouncer announcer, final Injector injector, + final Set> nodesToAnnounce, final Lifecycle lifecycle ) { lifecycle.addHandler( new Lifecycle.Handler() { - private volatile List> nodes = null; + private volatile List nodes = null; @Override public void start() throws Exception { if (nodes == null) { - nodes = Lists.transform( - nodesToAnnounce, - new Function>, Supplier>() - { - @Nullable - @Override - public Supplier apply( - @Nullable Key> input - ) - { - return injector.getInstance(input); - } - } - ); + nodes = Lists.newArrayList(); + for (KeyHolder holder : nodesToAnnounce) { + nodes.add(injector.getInstance(holder.getKey())); + } } - for (Supplier node : nodes) { - announcer.announce(node.get()); + for (DruidNode node : nodes) { + announcer.announce(node); } } @@ -180,8 +169,8 @@ public class DiscoveryModule implements Module public void stop() { if (nodes != null) { - for (Supplier node : nodes) { - announcer.unannounce(node.get()); + for (DruidNode node : nodes) { + announcer.unannounce(node); } } } @@ -195,13 +184,17 @@ public class DiscoveryModule implements Module @Provides @LazySingleton public ServiceDiscovery getServiceDiscovery( CuratorFramework curator, - Supplier config, + CuratorDiscoveryConfig config, Lifecycle lifecycle ) throws Exception { + if (!config.useDiscovery()) { + return new NoopServiceDiscovery<>(); + } + final ServiceDiscovery serviceDiscovery = ServiceDiscoveryBuilder.builder(Void.class) - .basePath(config.get().getPath()) + .basePath(config.getPath()) .client(curator) .build(); @@ -230,4 +223,183 @@ public class DiscoveryModule implements Module return serviceDiscovery; } + + private static class NoopServiceDiscovery implements ServiceDiscovery + { + @Override + public void start() throws Exception + { + + } + + @Override + public void registerService(ServiceInstance service) throws Exception + { + + } + + @Override + public void updateService(ServiceInstance service) throws Exception + { + + } + + @Override + public void unregisterService(ServiceInstance service) throws Exception + { + + } + + @Override + public ServiceCacheBuilder serviceCacheBuilder() + { + return new NoopServiceCacheBuilder<>(); + } + + @Override + public Collection queryForNames() throws Exception + { + return ImmutableList.of(); + } + + @Override + public Collection> queryForInstances(String name) throws Exception + { + return ImmutableList.of(); + } + + @Override + public ServiceInstance queryForInstance(String name, String id) throws Exception + { + return null; + } + + @Override + public ServiceProviderBuilder serviceProviderBuilder() + { + return new NoopServiceProviderBuilder<>(); + } + + @Override + public void close() throws IOException + { + + } + } + + private static class NoopServiceCacheBuilder implements ServiceCacheBuilder + { + @Override + public ServiceCache build() + { + return new NoopServiceCache<>(); + } + + @Override + public ServiceCacheBuilder name(String name) + { + return this; + } + + @Override + public ServiceCacheBuilder threadFactory(ThreadFactory threadFactory) + { + return this; + } + + private static class NoopServiceCache implements ServiceCache + { + @Override + public List> getInstances() + { + return ImmutableList.of(); + } + + @Override + public void start() throws Exception + { + + } + + @Override + public void close() throws IOException + { + + } + + @Override + public void addListener(ServiceCacheListener listener) + { + + } + + @Override + public void addListener( + ServiceCacheListener listener, Executor executor + ) + { + + } + + @Override + public void removeListener(ServiceCacheListener listener) + { + + } + } + } + + private static class NoopServiceProviderBuilder implements ServiceProviderBuilder + { + @Override + public ServiceProvider build() + { + return new NoopServiceProvider<>(); + } + + @Override + public ServiceProviderBuilder serviceName(String serviceName) + { + return this; + } + + @Override + public ServiceProviderBuilder providerStrategy(ProviderStrategy providerStrategy) + { + return this; + } + + @Override + public ServiceProviderBuilder threadFactory(ThreadFactory threadFactory) + { + return this; + } + + @Override + public ServiceProviderBuilder refreshPaddingMs(int refreshPaddingMs) + { + return this; + } + } + + private static class NoopServiceProvider implements ServiceProvider + { + @Override + public void start() throws Exception + { + + } + + @Override + public ServiceInstance getInstance() throws Exception + { + return null; + } + + @Override + public void close() throws IOException + { + + } + } } diff --git a/server/src/main/java/io/druid/guice/BrokerModule.java b/server/src/main/java/io/druid/guice/BrokerModule.java deleted file mode 100644 index 8ebc51f9832..00000000000 --- a/server/src/main/java/io/druid/guice/BrokerModule.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.google.common.base.Supplier; -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.TypeLiteral; -import io.druid.client.BrokerServerView; -import io.druid.client.CachingClusteredClient; -import io.druid.client.TimelineServerView; -import io.druid.client.cache.Cache; -import io.druid.client.cache.CacheProvider; -import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; -import io.druid.guice.annotations.Global; -import io.druid.query.MapQueryToolChestWarehouse; -import io.druid.query.QueryToolChestWarehouse; - -import java.nio.ByteBuffer; - -/** - */ -public class BrokerModule implements Module -{ - @Override - public void configure(Binder binder) - { - binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); - - binder.bind(CachingClusteredClient.class).in(LazySingleton.class); - binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); - - binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); - JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class); - - // This is a workaround and needs to be made better in the near future. - binder.bind( - new TypeLiteral>() - { - } - ).annotatedWith(Global.class).toInstance(new NoopStupidPool(null)); - } - - private static class NoopStupidPool extends StupidPool - { - public NoopStupidPool(Supplier generator) - { - super(generator); - } - - @Override - public ResourceHolder take() - { - throw new UnsupportedOperationException(); - } - } -} diff --git a/server/src/main/java/io/druid/guice/CassandraDataSegmentPusherProvider.java b/server/src/main/java/io/druid/guice/CassandraDataSegmentPusherProvider.java deleted file mode 100644 index 7ce7e2219fa..00000000000 --- a/server/src/main/java/io/druid/guice/CassandraDataSegmentPusherProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.cassandra.CassandraDataSegmentConfig; -import io.druid.segment.loading.cassandra.CassandraDataSegmentPusher; - -import javax.validation.constraints.NotNull; - -/** - */ -public class CassandraDataSegmentPusherProvider implements DataSegmentPusherProvider -{ - @JacksonInject - @NotNull - private CassandraDataSegmentConfig config = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public DataSegmentPusher get() - { - return new CassandraDataSegmentPusher(config, jsonMapper); - } -} diff --git a/server/src/main/java/io/druid/guice/CoordinatorModule.java b/server/src/main/java/io/druid/guice/CoordinatorModule.java deleted file mode 100644 index df0e36a53c5..00000000000 --- a/server/src/main/java/io/druid/guice/CoordinatorModule.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.Provides; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import io.druid.client.indexing.IndexingServiceClient; -import io.druid.db.DatabaseRuleManager; -import io.druid.db.DatabaseRuleManagerConfig; -import io.druid.db.DatabaseRuleManagerProvider; -import io.druid.db.DatabaseSegmentManager; -import io.druid.db.DatabaseSegmentManagerConfig; -import io.druid.db.DatabaseSegmentManagerProvider; -import io.druid.server.http.MasterRedirectInfo; -import io.druid.server.http.RedirectFilter; -import io.druid.server.http.RedirectInfo; -import io.druid.server.http.RedirectServlet; -import io.druid.server.master.DruidMaster; -import io.druid.server.master.DruidMasterConfig; -import io.druid.server.master.LoadQueueTaskMaster; -import org.apache.curator.framework.CuratorFramework; - -/** - */ -public class CoordinatorModule implements Module -{ - @Override - public void configure(Binder binder) - { - ConfigProvider.bind(binder, DruidMasterConfig.class); - - JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class); - JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class); - - binder.bind(RedirectServlet.class).in(LazySingleton.class); - binder.bind(RedirectFilter.class).in(LazySingleton.class); - - binder.bind(DatabaseSegmentManager.class) - .toProvider(DatabaseSegmentManagerProvider.class) - .in(ManageLifecycle.class); - - binder.bind(DatabaseRuleManager.class) - .toProvider(DatabaseRuleManagerProvider.class) - .in(ManageLifecycle.class); - - binder.bind(IndexingServiceClient.class).in(LazySingleton.class); - - binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class); - - binder.bind(DruidMaster.class); - } - - @Provides @LazySingleton - public LoadQueueTaskMaster getLoadQueueTaskMaster( - CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config - ) - { - return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config); - } -} diff --git a/server/src/main/java/io/druid/guice/DataSegmentPusherModule.java b/server/src/main/java/io/druid/guice/DataSegmentPusherModule.java deleted file mode 100644 index cd65b35b7ee..00000000000 --- a/server/src/main/java/io/druid/guice/DataSegmentPusherModule.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.google.inject.Binder; -import com.google.inject.Module; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.HdfsDataSegmentPusherConfig; -import io.druid.segment.loading.S3DataSegmentPusherConfig; -import io.druid.segment.loading.cassandra.CassandraDataSegmentConfig; -import org.apache.hadoop.conf.Configuration; - -/** - */ -public class DataSegmentPusherModule implements Module -{ - @Override - public void configure(Binder binder) - { - JsonConfigProvider.bind(binder, "druid.pusher", DataSegmentPusherProvider.class); - - JsonConfigProvider.bind(binder, "druid.pusher.s3", S3DataSegmentPusherConfig.class); - binder.bind(Configuration.class).toInstance(new Configuration()); - - JsonConfigProvider.bind(binder, "druid.pusher.hdfs", HdfsDataSegmentPusherConfig.class); - - JsonConfigProvider.bind(binder, "druid.pusher.cassandra", CassandraDataSegmentConfig.class); - - binder.bind(DataSegmentPusher.class).toProvider(DataSegmentPusherProvider.class); - } -} diff --git a/server/src/main/java/io/druid/guice/DataSegmentPusherProvider.java b/server/src/main/java/io/druid/guice/DataSegmentPusherProvider.java deleted file mode 100644 index 564f5002e94..00000000000 --- a/server/src/main/java/io/druid/guice/DataSegmentPusherProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; -import io.druid.segment.loading.DataSegmentPusher; - -/** - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalDataSegmentPusherProvider.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "s3_zip", value = S3DataSegmentPusherProvider.class), - @JsonSubTypes.Type(name = "hdfs", value = HdfsDataSegmentPusherProvider.class), - @JsonSubTypes.Type(name = "c*", value = CassandraDataSegmentPusherProvider.class) -}) -public interface DataSegmentPusherProvider extends Provider -{ -} diff --git a/server/src/main/java/io/druid/guice/DataSegmentPullerModule.java b/server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java similarity index 61% rename from server/src/main/java/io/druid/guice/DataSegmentPullerModule.java rename to server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java index 3fd70ceecab..9a1990f0424 100644 --- a/server/src/main/java/io/druid/guice/DataSegmentPullerModule.java +++ b/server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java @@ -20,19 +20,25 @@ package io.druid.guice; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.HdfsDataSegmentPuller; +import io.druid.segment.loading.HdfsDataSegmentPusher; +import io.druid.segment.loading.HdfsDataSegmentPusherConfig; import io.druid.segment.loading.LocalDataSegmentPuller; +import io.druid.segment.loading.LocalDataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPusherConfig; import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.S3DataSegmentPuller; +import io.druid.segment.loading.S3DataSegmentPusher; +import io.druid.segment.loading.S3DataSegmentPusherConfig; import io.druid.segment.loading.SegmentLoader; -import io.druid.segment.loading.cassandra.CassandraDataSegmentConfig; -import io.druid.segment.loading.cassandra.CassandraDataSegmentPuller; import org.apache.hadoop.conf.Configuration; /** */ -public class DataSegmentPullerModule implements Module +public class DataSegmentPusherPullerModule implements Module { @Override public void configure(Binder binder) @@ -42,7 +48,10 @@ public class DataSegmentPullerModule implements Module bindDeepStorageLocal(binder); bindDeepStorageS3(binder); bindDeepStorageHdfs(binder); - bindDeepStorageCassandra(binder); + + PolyBind.createChoice( + binder, "druid.pusher.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class) + ); } private static void bindDeepStorageLocal(Binder binder) @@ -51,6 +60,12 @@ public class DataSegmentPullerModule implements Module .addBinding("local") .to(LocalDataSegmentPuller.class) .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) + .addBinding("local") + .to(LocalDataSegmentPusher.class) + .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.pusher", LocalDataSegmentPusherConfig.class); } private static void bindDeepStorageS3(Binder binder) @@ -59,6 +74,12 @@ public class DataSegmentPullerModule implements Module .addBinding("s3_zip") .to(S3DataSegmentPuller.class) .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) + .addBinding("s3") + .to(S3DataSegmentPusher.class) + .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.pusher", S3DataSegmentPusherConfig.class); } private static void bindDeepStorageHdfs(Binder binder) @@ -67,15 +88,13 @@ public class DataSegmentPullerModule implements Module .addBinding("hdfs") .to(HdfsDataSegmentPuller.class) .in(LazySingleton.class); - binder.bind(Configuration.class).toInstance(new Configuration()); - } - private static void bindDeepStorageCassandra(Binder binder) - { - DruidBinders.dataSegmentPullerBinder(binder) - .addBinding("c*") - .to(CassandraDataSegmentPuller.class) - .in(LazySingleton.class); - ConfigProvider.bind(binder, CassandraDataSegmentConfig.class); + binder.bind(Configuration.class).toInstance(new Configuration()); + + PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) + .addBinding("hdfs") + .to(HdfsDataSegmentPusher.class) + .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.pusher", HdfsDataSegmentPusherConfig.class); } } diff --git a/server/src/main/java/io/druid/guice/DruidBinders.java b/server/src/main/java/io/druid/guice/DruidBinders.java index 5bc84227cd5..1ebf72488ee 100644 --- a/server/src/main/java/io/druid/guice/DruidBinders.java +++ b/server/src/main/java/io/druid/guice/DruidBinders.java @@ -22,10 +22,13 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; +import com.google.inject.multibindings.Multibinder; +import com.metamx.metrics.Monitor; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; import io.druid.segment.loading.DataSegmentPuller; +import io.druid.server.DruidNode; /** */ @@ -34,20 +37,14 @@ public class DruidBinders public static MapBinder, QueryRunnerFactory> queryRunnerFactoryBinder(Binder binder) { return MapBinder.newMapBinder( - binder, new TypeLiteral>() - { - }, TypeLiteral.get(QueryRunnerFactory.class) + binder, new TypeLiteral>(){}, TypeLiteral.get(QueryRunnerFactory.class) ); } public static MapBinder, QueryToolChest> queryToolChestBinder(Binder binder) { return MapBinder.newMapBinder( - binder, new TypeLiteral>() - { - }, new TypeLiteral() - { - } + binder, new TypeLiteral>(){}, new TypeLiteral(){} ); } @@ -55,4 +52,14 @@ public class DruidBinders { return MapBinder.newMapBinder(binder, String.class, DataSegmentPuller.class); } + + public static Multibinder> discoveryAnnouncementBinder(Binder binder) + { + return Multibinder.newSetBinder(binder, new TypeLiteral>(){}); + } + + public static Multibinder> metricMonitorBinder(Binder binder) + { + return Multibinder.newSetBinder(binder, new TypeLiteral>(){}); + } } diff --git a/server/src/main/java/io/druid/guice/HdfsDataSegmentPusherProvider.java b/server/src/main/java/io/druid/guice/HdfsDataSegmentPusherProvider.java deleted file mode 100644 index c6cbf50d16a..00000000000 --- a/server/src/main/java/io/druid/guice/HdfsDataSegmentPusherProvider.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.HdfsDataSegmentPusher; -import io.druid.segment.loading.HdfsDataSegmentPusherConfig; -import org.apache.hadoop.conf.Configuration; - -import javax.validation.constraints.NotNull; - -/** - */ -public class HdfsDataSegmentPusherProvider implements DataSegmentPusherProvider -{ - @JacksonInject - @NotNull - private HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = null; - - @JacksonInject - @NotNull - private Configuration config = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public DataSegmentPusher get() - { - return new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, config, jsonMapper); - } -} diff --git a/server/src/main/java/io/druid/guice/HistoricalModule.java b/server/src/main/java/io/druid/guice/NodeTypeConfig.java similarity index 63% rename from server/src/main/java/io/druid/guice/HistoricalModule.java rename to server/src/main/java/io/druid/guice/NodeTypeConfig.java index 0e00526351b..e0cb4b776d5 100644 --- a/server/src/main/java/io/druid/guice/HistoricalModule.java +++ b/server/src/main/java/io/druid/guice/NodeTypeConfig.java @@ -19,22 +19,21 @@ package io.druid.guice; -import com.google.inject.Binder; -import com.google.inject.Module; -import com.metamx.common.logger.Logger; -import io.druid.server.coordination.ServerManager; -import io.druid.server.coordination.ZkCoordinator; - /** */ -public class HistoricalModule implements Module +public class NodeTypeConfig { - private static final Logger log = new Logger(HistoricalModule.class); + private final String nodeType; - @Override - public void configure(Binder binder) + public NodeTypeConfig( + String nodeType + ) { - binder.bind(ServerManager.class).in(LazySingleton.class); - binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + this.nodeType = nodeType; + } + + public String getNodeType() + { + return nodeType; } } diff --git a/server/src/main/java/io/druid/guice/QueryableModule.java b/server/src/main/java/io/druid/guice/QueryableModule.java index 184f80a1b58..4a279e57d87 100644 --- a/server/src/main/java/io/druid/guice/QueryableModule.java +++ b/server/src/main/java/io/druid/guice/QueryableModule.java @@ -22,6 +22,7 @@ package io.druid.guice; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; +import com.google.inject.util.Providers; import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; import io.druid.server.QueryServlet; @@ -37,20 +38,11 @@ import java.util.List; */ public class QueryableModule implements DruidModule { - private final Class walkerClass; - - public QueryableModule( - Class walkerClass - ) - { - this.walkerClass = walkerClass; - } - @Override public void configure(Binder binder) { binder.bind(QueryServlet.class).in(LazySingleton.class); - binder.bind(QuerySegmentWalker.class).to(walkerClass).in(LazySingleton.class); + binder.bind(QuerySegmentWalker.class).toProvider(Providers.of(null)); binder.bind(RequestLogger.class).toProvider(RequestLoggerProvider.class).in(ManageLifecycle.class); JsonConfigProvider.bind(binder, "druid.request.logging", RequestLoggerProvider.class); } diff --git a/server/src/main/java/io/druid/guice/S3DataSegmentPusherProvider.java b/server/src/main/java/io/druid/guice/S3DataSegmentPusherProvider.java deleted file mode 100644 index 2cb9765cbf7..00000000000 --- a/server/src/main/java/io/druid/guice/S3DataSegmentPusherProvider.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.guice; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.S3DataSegmentPusher; -import io.druid.segment.loading.S3DataSegmentPusherConfig; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; - -import javax.validation.constraints.NotNull; - -/** - */ -public class S3DataSegmentPusherProvider implements DataSegmentPusherProvider -{ - @JacksonInject - @NotNull - private RestS3Service restS3Service = null; - - @JacksonInject - @NotNull - private S3DataSegmentPusherConfig config = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public DataSegmentPusher get() - { - return new S3DataSegmentPusher(restS3Service, config, jsonMapper); - } -} diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index 2ee270cc330..f50ab39bbb7 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -22,6 +22,8 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.ProvisionException; +import com.google.inject.util.Providers; import io.druid.client.DruidServerConfig; import io.druid.guice.annotations.Self; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; @@ -32,23 +34,19 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.DruidNode; import io.druid.server.coordination.DruidServerMetadata; +import javax.annotation.Nullable; + /** */ public class StorageNodeModule implements Module { - private final String nodeType; - - public StorageNodeModule(String nodeType) - { - this.nodeType = nodeType; - } - @Override public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); + binder.bind(NodeTypeConfig.class).toProvider(Providers.of(null)); binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class); binder.bind(QueryRunnerFactoryConglomerate.class) @@ -58,13 +56,17 @@ public class StorageNodeModule implements Module @Provides @LazySingleton - public DruidServerMetadata getMetadata(@Self DruidNode node, DruidServerConfig config) + public DruidServerMetadata getMetadata(@Self DruidNode node, @Nullable NodeTypeConfig nodeType, DruidServerConfig config) { + if (nodeType == null) { + throw new ProvisionException("Must override the binding for NodeTypeConfig if you want a DruidServerMetadata."); + } + return new DruidServerMetadata( node.getHost(), node.getHost(), config.getMaxSize(), - nodeType, + nodeType.getNodeType(), config.getTier() ); } diff --git a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java index 31efc1b0ea9..bfec5093392 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/OmniSegmentLoader.java @@ -65,6 +65,11 @@ public class OmniSegmentLoader implements SegmentLoader } } + public OmniSegmentLoader withConfig(SegmentLoaderConfig config) + { + return new OmniSegmentLoader(pullers, factory, config); + } + @Override public boolean isSegmentLoaded(final DataSegment segment) { diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index f4034865022..a1339790a20 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -20,6 +20,7 @@ package io.druid.segment.loading; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; import org.hibernate.validator.constraints.NotEmpty; import java.io.File; @@ -58,6 +59,15 @@ public class SegmentLoaderConfig return infoDir; } + public SegmentLoaderConfig withLocations(List locations) + { + SegmentLoaderConfig retVal = new SegmentLoaderConfig(); + retVal.locations = Lists.newArrayList(locations); + retVal.deleteOnRemove = this.deleteOnRemove; + retVal.infoDir = this.infoDir; + return retVal; + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/segment/loading/StorageLocationConfig.java b/server/src/main/java/io/druid/segment/loading/StorageLocationConfig.java index 2ab5920313d..dc851681011 100644 --- a/server/src/main/java/io/druid/segment/loading/StorageLocationConfig.java +++ b/server/src/main/java/io/druid/segment/loading/StorageLocationConfig.java @@ -42,11 +42,23 @@ public class StorageLocationConfig return path; } + public StorageLocationConfig setPath(File path) + { + this.path = path; + return this; + } + public long getMaxSize() { return maxSize; } + public StorageLocationConfig setMaxSize(long maxSize) + { + this.maxSize = maxSize; + return this; + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index ee7135db0bb..4cbe042350a 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -94,7 +94,7 @@ public class ServerManager implements QuerySegmentWalker this.exec = exec; - this.dataSources = new HashMap>(); + this.dataSources = new HashMap<>(); } public Map getDataSourceSizes() diff --git a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java index 80c6fb7cc4b..4453f0be30d 100644 --- a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java @@ -20,7 +20,6 @@ package io.druid.server.initialization; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import com.google.inject.Binder; import com.google.inject.ConfigurationException; @@ -44,13 +43,13 @@ import io.druid.guice.LazySingleton; import io.druid.guice.annotations.JSR311Resource; import io.druid.guice.annotations.Self; import io.druid.server.DruidNode; +import io.druid.server.StatusResource; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.util.thread.QueuedThreadPool; import javax.servlet.ServletException; -import java.util.List; import java.util.Map; import java.util.Set; @@ -60,22 +59,6 @@ public class JettyServerModule extends JerseyServletModule { private static final Logger log = new Logger(JettyServerModule.class); - private final JettyServerInitializer initializer; - private final List> resources = Lists.newArrayList(); - - public JettyServerModule( - JettyServerInitializer initializer - ) - { - this.initializer = initializer; - } - - public JettyServerModule addResource(Class resource) - { - resources.add(resource); - return this; - } - @Override protected void configureServlets() { @@ -87,10 +70,8 @@ public class JettyServerModule extends JerseyServletModule binder.bind(DruidGuiceContainer.class).in(Scopes.SINGLETON); serve("/*").with(DruidGuiceContainer.class); - for (Class resource : resources) { - Jerseys.addResource(binder, resource); - binder.bind(resource).in(LazySingleton.class); - } + Jerseys.addResource(binder, StatusResource.class); + binder.bind(StatusResource.class).in(LazySingleton.class); binder.bind(Key.get(Server.class, Names.named("ForTheEagerness"))).to(Server.class).asEagerSingleton(); } @@ -121,6 +102,8 @@ public class JettyServerModule extends JerseyServletModule @Provides @LazySingleton public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config) { + JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class); + final Server server = makeJettyServer(node, config); try { initializer.initialize(server, injector); diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index ee7dd20c2f8..159cc412675 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -19,11 +19,10 @@ package io.druid.server.metrics; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Binder; -import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; @@ -34,14 +33,12 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import io.druid.concurrent.Execs; +import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; -import io.druid.guice.JsonConfigurator; -import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; import java.util.List; -import java.util.Properties; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Set; /** * Sets up the {@link MonitorScheduler} to monitor things on a regular schedule. {@link Monitor}s must be explicitly @@ -51,40 +48,18 @@ public class MetricsModule implements Module { private static final Logger log = new Logger(MetricsModule.class); - private final List> monitors = new CopyOnWriteArrayList>(); - public boolean configured = false; - - public MetricsModule register(Class monitorClazz) + public static void register(Binder binder, Class monitorClazz) { - synchronized (monitors) { - Preconditions.checkState(!configured, "Cannot register monitor[%s] after configuration.", monitorClazz); - } - monitors.add(monitorClazz); - return this; - } - - @Inject - public void setProperties(Properties props, JsonConfigurator configurator) - { - final MonitorsConfig config = configurator.configurate( - props, - "druid.monitoring", - MonitorsConfig.class - ); - - for (Class monitorClazz : config.getMonitors()) { - register(monitorClazz); - } + DruidBinders.metricMonitorBinder(binder).addBinding().toInstance(monitorClazz); } @Override public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.monitoring", DruidMonitorSchedulerConfig.class); + JsonConfigProvider.bind(binder, "druid.monitoring", MonitorsConfig.class); - for (Class monitor : monitors) { - binder.bind(monitor).in(LazySingleton.class); - } + DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum. // Instantiate eagerly so that we get everything registered and put into the Lifecycle binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness"))) @@ -96,20 +71,20 @@ public class MetricsModule implements Module @ManageLifecycle public MonitorScheduler getMonitorScheduler( Supplier config, + MonitorsConfig monitorsConfig, + Set> monitorSet, ServiceEmitter emitter, Injector injector ) { List monitors = Lists.newArrayList(); - for (Key key : injector.getBindings().keySet()) { - if (Monitor.class.isAssignableFrom(key.getTypeLiteral().getRawType())) { - final Monitor monitor = (Monitor) injector.getInstance(key); + for (Class monitorClass : Iterables.concat(monitorsConfig.getMonitors(), monitorSet)) { + final Monitor monitor = injector.getInstance(monitorClass); - log.info("Adding monitor[%s]", monitor); + log.info("Adding monitor[%s]", monitor); - monitors.add(monitor); - } + monitors.add(monitor); } return new MonitorScheduler( diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index d27be088956..e122c5bf580 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -20,34 +20,29 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; -import com.google.inject.Injector; -import com.google.inject.servlet.GuiceFilter; +import com.google.inject.Binder; +import com.google.inject.Module; import com.metamx.common.logger.Logger; import io.airlift.command.Command; +import io.druid.client.BrokerServerView; +import io.druid.client.CachingClusteredClient; +import io.druid.client.TimelineServerView; +import io.druid.client.cache.Cache; import io.druid.client.cache.CacheMonitor; -import io.druid.curator.CuratorModule; +import io.druid.client.cache.CacheProvider; import io.druid.curator.discovery.DiscoveryModule; -import io.druid.guice.BrokerModule; -import io.druid.guice.HttpClientModule; -import io.druid.guice.LifecycleModule; -import io.druid.guice.QueryToolChestModule; -import io.druid.guice.QueryableModule; -import io.druid.guice.ServerModule; -import io.druid.guice.ServerViewModule; -import io.druid.guice.annotations.Client; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; +import io.druid.query.MapQueryToolChestWarehouse; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; -import io.druid.server.StatusResource; -import io.druid.server.initialization.EmitterModule; -import io.druid.server.initialization.JettyServerModule; +import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.metrics.MetricsModule; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; import java.util.List; @@ -70,38 +65,27 @@ public class CliBroker extends ServerRunnable protected List getModules() { return ImmutableList.of( - new LifecycleModule(), - EmitterModule.class, - HttpClientModule.global(), - CuratorModule.class, - new MetricsModule().register(CacheMonitor.class), - new DiscoveryModule().register(Self.class), - new ServerModule(), - new JettyServerModule(new BrokerJettyServerInitializer()) - .addResource(ClientInfoResource.class) - .addResource(StatusResource.class), - new QueryableModule(ClientQuerySegmentWalker.class), - new QueryToolChestModule(), - new ServerViewModule(), - new HttpClientModule("druid.broker.http", Client.class), - new BrokerModule() + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); + + binder.bind(CachingClusteredClient.class).in(LazySingleton.class); + binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); + + binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); + JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class); + + binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); + Jerseys.addResource(binder, ClientInfoResource.class); + + DiscoveryModule.register(binder, Self.class); + MetricsModule.register(binder, CacheMonitor.class); + } + } ); } - - private static class BrokerJettyServerInitializer extends QueryJettyServerInitializer - { - @Override - public void initialize(Server server, Injector injector) - { - super.initialize(server, injector); - - final ServletContextHandler resources = new ServletContextHandler(ServletContextHandler.SESSIONS); - resources.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - resources.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", null); - - final HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resources}); - server.setHandler(handlerList); - } - } } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 027edaf0901..3d27aa10bd0 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -19,42 +19,41 @@ package io.druid.cli; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.inject.Injector; -import com.google.inject.servlet.GuiceFilter; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.command.Command; -import io.druid.curator.CuratorModule; +import io.druid.client.indexing.IndexingServiceClient; import io.druid.curator.discovery.DiscoveryModule; -import io.druid.guice.CoordinatorModule; -import io.druid.guice.DbConnectorModule; -import io.druid.guice.HttpClientModule; -import io.druid.guice.IndexingServiceDiscoveryModule; -import io.druid.guice.JacksonConfigManagerModule; +import io.druid.db.DatabaseRuleManager; +import io.druid.db.DatabaseRuleManagerConfig; +import io.druid.db.DatabaseRuleManagerProvider; +import io.druid.db.DatabaseSegmentManager; +import io.druid.db.DatabaseSegmentManagerConfig; +import io.druid.db.DatabaseSegmentManagerProvider; +import io.druid.guice.ConfigProvider; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; -import io.druid.guice.ServerModule; -import io.druid.guice.ServerViewModule; +import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; -import io.druid.server.StatusResource; import io.druid.server.http.BackwardsCompatiableInfoResource; import io.druid.server.http.InfoResource; +import io.druid.server.http.MasterRedirectInfo; import io.druid.server.http.MasterResource; import io.druid.server.http.RedirectFilter; -import io.druid.server.initialization.EmitterModule; +import io.druid.server.http.RedirectInfo; +import io.druid.server.http.RedirectServlet; import io.druid.server.initialization.JettyServerInitializer; -import io.druid.server.initialization.JettyServerModule; import io.druid.server.master.DruidMaster; -import io.druid.server.metrics.MetricsModule; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.server.handler.ResourceHandler; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.FilterHolder; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.servlets.GzipFilter; +import io.druid.server.master.DruidMasterConfig; +import io.druid.server.master.LoadQueueTaskMaster; +import org.apache.curator.framework.CuratorFramework; import java.util.List; @@ -77,45 +76,51 @@ public class CliCoordinator extends ServerRunnable protected List getModules() { return ImmutableList.of( - new LifecycleModule().register(DruidMaster.class), - EmitterModule.class, - HttpClientModule.global(), - DbConnectorModule.class, - JacksonConfigManagerModule.class, - CuratorModule.class, - new MetricsModule(), - new DiscoveryModule().register(Self.class), - new ServerModule(), - new JettyServerModule(new CoordinatorJettyServerInitializer()) - .addResource(InfoResource.class) - .addResource(BackwardsCompatiableInfoResource.class) - .addResource(MasterResource.class) - .addResource(StatusResource.class), - new ServerViewModule(), - new IndexingServiceDiscoveryModule(), - CoordinatorModule.class + new Module() + { + @Override + public void configure(Binder binder) + { + ConfigProvider.bind(binder, DruidMasterConfig.class); + + JsonConfigProvider.bind(binder, "druid.manager.segment", DatabaseSegmentManagerConfig.class); + JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class); + + binder.bind(RedirectServlet.class).in(LazySingleton.class); + binder.bind(RedirectFilter.class).in(LazySingleton.class); + + binder.bind(DatabaseSegmentManager.class) + .toProvider(DatabaseSegmentManagerProvider.class) + .in(ManageLifecycle.class); + + binder.bind(DatabaseRuleManager.class) + .toProvider(DatabaseRuleManagerProvider.class) + .in(ManageLifecycle.class); + + binder.bind(IndexingServiceClient.class).in(LazySingleton.class); + + binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class); + + binder.bind(DruidMaster.class); + + LifecycleModule.register(binder, DruidMaster.class); + DiscoveryModule.register(binder, Self.class); + + binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); + Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class); + Jerseys.addResource(binder, InfoResource.class); + Jerseys.addResource(binder, MasterResource.class); + } + + @Provides + @LazySingleton + public LoadQueueTaskMaster getLoadQueueTaskMaster( + CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config + ) + { + return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config); + } + } ); } - - private static class CoordinatorJettyServerInitializer implements JettyServerInitializer - { - @Override - public void initialize(Server server, Injector injector) - { - ResourceHandler resourceHandler = new ResourceHandler(); - resourceHandler.setResourceBase(DruidMaster.class.getClassLoader().getResource("static").toExternalForm()); - - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); - root.setContextPath("/"); - - HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); - server.setHandler(handlerList); - - root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); - root.addFilter(GuiceFilter.class, "/*", null); - } - } } diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 7b493b639f3..64a18c2e6b4 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -20,25 +20,18 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Module; import com.metamx.common.logger.Logger; import io.airlift.command.Command; -import io.druid.curator.CuratorModule; -import io.druid.guice.AWSModule; -import io.druid.guice.AnnouncerModule; -import io.druid.guice.DataSegmentPullerModule; -import io.druid.guice.DruidProcessingModule; -import io.druid.guice.HistoricalModule; -import io.druid.guice.HttpClientModule; +import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; -import io.druid.guice.QueryRunnerFactoryModule; -import io.druid.guice.QueryableModule; -import io.druid.guice.ServerModule; -import io.druid.guice.StorageNodeModule; -import io.druid.server.StatusResource; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; +import io.druid.query.QuerySegmentWalker; import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ZkCoordinator; -import io.druid.server.initialization.EmitterModule; -import io.druid.server.initialization.JettyServerModule; +import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.metrics.MetricsModule; import io.druid.server.metrics.ServerMonitor; @@ -63,22 +56,21 @@ public class CliHistorical extends ServerRunnable protected List getModules() { return ImmutableList.of( - new LifecycleModule().register(ZkCoordinator.class), - EmitterModule.class, - HttpClientModule.global(), - CuratorModule.class, - AnnouncerModule.class, - DruidProcessingModule.class, - AWSModule.class, - DataSegmentPullerModule.class, - new MetricsModule().register(ServerMonitor.class), - new ServerModule(), - new StorageNodeModule("historical"), - new JettyServerModule(new QueryJettyServerInitializer()) - .addResource(StatusResource.class), - new QueryableModule(ServerManager.class), - new QueryRunnerFactoryModule(), - HistoricalModule.class + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(ServerManager.class).in(LazySingleton.class); + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class); + + LifecycleModule.register(binder, ZkCoordinator.class); + MetricsModule.register(binder, ServerMonitor.class); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical")); + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); + } + } ); } } diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 7b40d812d41..b9408376074 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -20,32 +20,27 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; -import com.google.inject.Injector; -import com.google.inject.servlet.GuiceFilter; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; import com.metamx.common.logger.Logger; import io.airlift.command.Command; -import io.druid.curator.CuratorModule; -import io.druid.guice.AWSModule; -import io.druid.guice.HttpClientModule; +import io.druid.guice.IndexingServiceModuleHelper; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; -import io.druid.guice.MiddleManagerModule; -import io.druid.guice.ServerModule; -import io.druid.guice.TaskLogsModule; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; +import io.druid.indexing.coordinator.ForkingTaskRunner; +import io.druid.indexing.coordinator.TaskRunner; +import io.druid.indexing.worker.Worker; +import io.druid.indexing.worker.WorkerCuratorCoordinator; import io.druid.indexing.worker.WorkerTaskMonitor; +import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.http.WorkerResource; -import io.druid.server.StatusResource; -import io.druid.server.initialization.EmitterModule; +import io.druid.server.DruidNode; import io.druid.server.initialization.JettyServerInitializer; -import io.druid.server.initialization.JettyServerModule; -import io.druid.server.metrics.MetricsModule; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.servlets.GzipFilter; import java.util.List; @@ -68,34 +63,38 @@ public class CliMiddleManager extends ServerRunnable protected List getModules() { return ImmutableList.of( - new LifecycleModule().register(WorkerTaskMonitor.class), - EmitterModule.class, - HttpClientModule.global(), - CuratorModule.class, - new MetricsModule(), - new ServerModule(), - new JettyServerModule(new MiddleManagerJettyServerInitializer()) - .addResource(StatusResource.class) - .addResource(WorkerResource.class), - new AWSModule(), - new TaskLogsModule(), - new MiddleManagerModule() + new Module() + { + @Override + public void configure(Binder binder) + { + IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); + + JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class); + + binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); + binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); + + binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); + binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); + + LifecycleModule.register(binder, WorkerTaskMonitor.class); + binder.bind(JettyServerInitializer.class).toInstance(new MiddleManagerJettyServerInitializer()); + Jerseys.addResource(binder, WorkerResource.class); + } + + @Provides + @LazySingleton + public Worker getWorker(@Self DruidNode node, WorkerConfig config) + { + return new Worker( + node.getHost(), + config.getIp(), + config.getCapacity(), + config.getVersion() + ); + } + } ); } - - private static class MiddleManagerJettyServerInitializer implements JettyServerInitializer - { - @Override - public void initialize(Server server, Injector injector) - { - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); - root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(GuiceFilter.class, "/*", null); - - final HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{root, new DefaultHandler()}); - server.setHandler(handlerList); - } - } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index a7842673c44..b79003c610e 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -20,28 +20,56 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.MapBinder; import com.google.inject.servlet.GuiceFilter; import com.metamx.common.logger.Logger; import io.airlift.command.Command; -import io.druid.curator.CuratorModule; -import io.druid.curator.discovery.DiscoveryModule; -import io.druid.guice.AWSModule; -import io.druid.guice.DbConnectorModule; -import io.druid.guice.HttpClientModule; -import io.druid.guice.JacksonConfigManagerModule; -import io.druid.guice.LifecycleModule; -import io.druid.guice.OverlordModule; -import io.druid.guice.ServerModule; -import io.druid.guice.TaskLogsModule; +import io.druid.guice.IndexingServiceModuleHelper; +import io.druid.guice.JacksonConfigProvider; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.ListProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.PolyBind; +import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; +import io.druid.indexing.common.tasklogs.TaskLogStreamer; +import io.druid.indexing.common.tasklogs.TaskLogs; +import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; +import io.druid.indexing.coordinator.DbTaskStorage; +import io.druid.indexing.coordinator.ForkingTaskRunnerFactory; +import io.druid.indexing.coordinator.HeapMemoryTaskStorage; +import io.druid.indexing.coordinator.IndexerDBCoordinator; +import io.druid.indexing.coordinator.RemoteTaskRunnerFactory; +import io.druid.indexing.coordinator.TaskLockbox; import io.druid.indexing.coordinator.TaskMaster; +import io.druid.indexing.coordinator.TaskQueue; +import io.druid.indexing.coordinator.TaskRunnerFactory; +import io.druid.indexing.coordinator.TaskStorage; +import io.druid.indexing.coordinator.TaskStorageQueryAdapter; import io.druid.indexing.coordinator.http.IndexerCoordinatorResource; -import io.druid.server.StatusResource; +import io.druid.indexing.coordinator.http.OverlordRedirectInfo; +import io.druid.indexing.coordinator.scaling.AutoScalingStrategy; +import io.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; +import io.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy; +import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; +import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; +import io.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactoryImpl; +import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy; +import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; +import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; +import io.druid.indexing.coordinator.setup.WorkerSetupData; import io.druid.server.http.RedirectFilter; -import io.druid.server.initialization.EmitterModule; +import io.druid.server.http.RedirectInfo; import io.druid.server.initialization.JettyServerInitializer; -import io.druid.server.initialization.JettyServerModule; -import io.druid.server.metrics.MetricsModule; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; @@ -75,24 +103,103 @@ public class CliOverlord extends ServerRunnable protected List getModules() { return ImmutableList.of( - new LifecycleModule(), - EmitterModule.class, - HttpClientModule.global(), - CuratorModule.class, - new MetricsModule(), - new ServerModule(), - new AWSModule(), - new DbConnectorModule(), - new JacksonConfigManagerModule(), - new JettyServerModule(new OverlordJettyServerInitializer()) - .addResource(IndexerCoordinatorResource.class) - .addResource(StatusResource.class), - new DiscoveryModule(), - new TaskLogsModule(), - new OverlordModule() + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(TaskMaster.class).in(ManageLifecycle.class); + + binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); + binder.bind(new TypeLiteral>(){}) + .toProvider( + new ListProvider() + .add(TaskRunnerTaskLogStreamer.class) + .add(TaskLogs.class) + ) + .in(LazySingleton.class); + + binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); + binder.bind(TaskActionToolbox.class).in(LazySingleton.class); + binder.bind(TaskQueue.class).in(LazySingleton.class); // Lifecycle managed by TaskMaster instead + binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class); + binder.bind(TaskLockbox.class).in(LazySingleton.class); + binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); + binder.bind(ResourceManagementSchedulerFactory.class) + .to(ResourceManagementSchedulerFactoryImpl.class) + .in(LazySingleton.class); + + configureTaskStorage(binder); + configureRunners(binder); + configureAutoscale(binder); + + binder.bind(RedirectFilter.class).in(LazySingleton.class); + binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class); + + binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer()); + Jerseys.addResource(binder, IndexerCoordinatorResource.class); + } + + private void configureTaskStorage(Binder binder) + { + PolyBind.createChoice( + binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) + ); + final MapBinder storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class)); + + storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class); + binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class); + + storageBinder.addBinding("db").to(DbTaskStorage.class); + binder.bind(DbTaskStorage.class).in(LazySingleton.class); + } + + private void configureRunners(Binder binder) + { + PolyBind.createChoice( + binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) + ); + final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); + + IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); + biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); + binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class); + + biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + } + + private void configureAutoscale(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); + binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class); + + JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); + + PolyBind.createChoice( + binder, + "druid.indexer.autoscale.strategy", + Key.get(AutoScalingStrategy.class), + Key.get(NoopAutoScalingStrategy.class) + ); + + final MapBinder autoScalingBinder = PolyBind.optionBinder( + binder, Key.get(AutoScalingStrategy.class) + ); + autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class); + binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class); + + autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class); + binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class); + + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); + } + } ); } + /** + */ private static class OverlordJettyServerInitializer implements JettyServerInitializer { @Override diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 9962e6d93ce..1d5e1035232 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -21,41 +21,47 @@ package io.druid.cli; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.multibindings.MapBinder; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; import io.airlift.command.Option; -import io.druid.curator.CuratorModule; -import io.druid.curator.discovery.DiscoveryModule; -import io.druid.guice.AWSModule; -import io.druid.guice.AnnouncerModule; -import io.druid.guice.DataSegmentPullerModule; -import io.druid.guice.DataSegmentPusherModule; -import io.druid.guice.DruidProcessingModule; -import io.druid.guice.HttpClientModule; -import io.druid.guice.IndexingServiceDiscoveryModule; -import io.druid.guice.LifecycleModule; -import io.druid.guice.PeonModule; -import io.druid.guice.QueryRunnerFactoryModule; -import io.druid.guice.QueryableModule; -import io.druid.guice.ServerModule; -import io.druid.guice.ServerViewModule; -import io.druid.guice.StorageNodeModule; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; +import io.druid.guice.PolyBind; +import io.druid.indexing.common.RetryPolicyConfig; +import io.druid.indexing.common.RetryPolicyFactory; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.index.ChatHandlerProvider; +import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; +import io.druid.indexing.common.index.NoopChatHandlerProvider; +import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.initialization.LogLevelAdjuster; -import io.druid.server.StatusResource; -import io.druid.server.initialization.EmitterModule; -import io.druid.server.initialization.Initialization; -import io.druid.server.initialization.JettyServerModule; -import io.druid.server.metrics.MetricsModule; +import io.druid.query.QuerySegmentWalker; +import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.S3DataSegmentKiller; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.StorageLocationConfig; +import io.druid.server.initialization.JettyServerInitializer; import java.io.File; +import java.util.Arrays; import java.util.List; /** @@ -87,32 +93,59 @@ public class CliPeon implements Runnable { return Initialization.makeInjectorWithModules( injector, - ImmutableList.of( - new LifecycleModule(), - EmitterModule.class, - HttpClientModule.global(), - CuratorModule.class, - new MetricsModule(), - new ServerModule(), - new JettyServerModule(new QueryJettyServerInitializer()) - .addResource(StatusResource.class) - .addResource(ChatHandlerResource.class), - new DiscoveryModule(), - new ServerViewModule(), - new StorageNodeModule(nodeType), - new DataSegmentPullerModule(), - new DataSegmentPusherModule(), - new AnnouncerModule(), - new DruidProcessingModule(), - new QueryableModule(ThreadPoolTaskRunner.class), - new QueryRunnerFactoryModule(), - new IndexingServiceDiscoveryModule(), - new AWSModule(), - new PeonModule( - new ExecutorLifecycleConfig() - .setTaskFile(new File(taskAndStatusFile.get(0))) - .setStatusFile(new File(taskAndStatusFile.get(1))) - ) + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + PolyBind.createChoice( + binder, + "druid.indexer.task.chathandler.type", + Key.get(ChatHandlerProvider.class), + Key.get(NoopChatHandlerProvider.class) + ); + final MapBinder handlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(ChatHandlerProvider.class) + ); + handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); + handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); + + binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); + + JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); + JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class); + + binder.bind(TaskActionClientFactory.class) + .to(RemoteTaskActionClientFactory.class) + .in(LazySingleton.class); + binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); + + binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class); + + binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); + binder.bind(ExecutorLifecycleConfig.class).toInstance( + new ExecutorLifecycleConfig() + .setTaskFile(new File(taskAndStatusFile.get(0))) + .setStatusFile(new File(taskAndStatusFile.get(1))) + ); + + binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); + binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); + binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); + + // Override the default SegmentLoaderConfig because we don't actually care about the + // configuration based locations. This will override them anyway. This is also stopping + // configuration of other parameters, but I don't think that's actually a problem. + // Note, if that is actually not a problem, then that probably means we have the wrong abstraction. + binder.bind(SegmentLoaderConfig.class) + .toInstance(new SegmentLoaderConfig().withLocations(Arrays.asList())); + + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); + Jerseys.addResource(binder, ChatHandlerResource.class); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); + } + } ) ); } diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index c6eab365cba..9677016ac0b 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -22,25 +22,7 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; import com.metamx.common.logger.Logger; import io.airlift.command.Command; -import io.druid.curator.CuratorModule; -import io.druid.guice.AWSModule; -import io.druid.guice.AnnouncerModule; -import io.druid.guice.DataSegmentPusherModule; -import io.druid.guice.DbConnectorModule; -import io.druid.guice.DruidProcessingModule; -import io.druid.guice.HttpClientModule; -import io.druid.guice.LifecycleModule; -import io.druid.guice.QueryRunnerFactoryModule; -import io.druid.guice.QueryableModule; import io.druid.guice.RealtimeModule; -import io.druid.guice.ServerModule; -import io.druid.guice.ServerViewModule; -import io.druid.guice.StorageNodeModule; -import io.druid.segment.realtime.RealtimeManager; -import io.druid.server.StatusResource; -import io.druid.server.initialization.EmitterModule; -import io.druid.server.initialization.JettyServerModule; -import io.druid.server.metrics.MetricsModule; import java.util.List; @@ -63,24 +45,7 @@ public class CliRealtime extends ServerRunnable protected List getModules() { return ImmutableList.of( - new LifecycleModule(), - EmitterModule.class, - DbConnectorModule.class, - HttpClientModule.global(), - CuratorModule.class, - AnnouncerModule.class, - DruidProcessingModule.class, - AWSModule.class, - DataSegmentPusherModule.class, - new MetricsModule(), - new ServerModule(), - new StorageNodeModule("realtime"), - new JettyServerModule(new QueryJettyServerInitializer()) - .addResource(StatusResource.class), - new ServerViewModule(), - new QueryableModule(RealtimeManager.class), - new QueryRunnerFactoryModule(), - RealtimeModule.class + new RealtimeModule() ); } } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 3af6b116e6a..0dcbfe6d99b 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -19,22 +19,33 @@ package io.druid.cli; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; import com.metamx.common.logger.Logger; -import druid.examples.guice.RealtimeExampleModule; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; -import io.druid.guice.DruidProcessingModule; -import io.druid.guice.LifecycleModule; -import io.druid.guice.QueryRunnerFactoryModule; -import io.druid.guice.QueryableModule; -import io.druid.guice.ServerModule; -import io.druid.guice.StorageNodeModule; -import io.druid.segment.realtime.RealtimeManager; -import io.druid.server.StatusResource; -import io.druid.server.initialization.EmitterModule; -import io.druid.server.initialization.JettyServerModule; +import io.druid.client.DruidServer; +import io.druid.client.InventoryView; +import io.druid.client.ServerView; +import io.druid.guice.NoopSegmentPublisherProvider; +import io.druid.guice.RealtimeModule; +import io.druid.initialization.DruidModule; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.SegmentPublisher; +import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.timeline.DataSegment; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executor; /** */ @@ -54,17 +65,104 @@ public class CliRealtimeExample extends ServerRunnable @Override protected List getModules() { - return ImmutableList.of( - new LifecycleModule(), - EmitterModule.class, - DruidProcessingModule.class, - new ServerModule(), - new StorageNodeModule("realtime"), - new JettyServerModule(new QueryJettyServerInitializer()) - .addResource(StatusResource.class), - new QueryableModule(RealtimeManager.class), - new QueryRunnerFactoryModule(), - RealtimeExampleModule.class + return ImmutableList.of( + new RealtimeModule(), + new DruidModule() + { + @Override + public void configure(Binder binder) + { + binder.bind(SegmentPublisher.class).toProvider(NoopSegmentPublisherProvider.class); + binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class); + binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class); + binder.bind(InventoryView.class).to(NoopInventoryView.class); + binder.bind(ServerView.class).to(NoopServerView.class); + } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("RealtimeExampleModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream") + ) + ); + } + } ); } + + private static class NoopServerView implements ServerView + { + @Override + public void registerServerCallback( + Executor exec, ServerCallback callback + ) + { + // do nothing + } + + @Override + public void registerSegmentCallback( + Executor exec, SegmentCallback callback + ) + { + // do nothing + } + } + + private static class NoopInventoryView implements InventoryView + { + @Override + public DruidServer getInventoryValue(String string) + { + return null; + } + + @Override + public Iterable getInventory() + { + return ImmutableList.of(); + } + } + + private static class NoopDataSegmentPusher implements DataSegmentPusher + { + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + return segment; + } + } + + private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer + { + @Override + public void announceSegment(DataSegment segment) throws IOException + { + // do nothing + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + // do nothing + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + // do nothing + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + // do nothing + } + } } diff --git a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java new file mode 100644 index 00000000000..391ad699594 --- /dev/null +++ b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java @@ -0,0 +1,60 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceFilter; +import io.druid.server.http.RedirectFilter; +import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.master.DruidMaster; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.server.handler.ResourceHandler; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.GzipFilter; + +/** +*/ +class CoordinatorJettyServerInitializer implements JettyServerInitializer +{ + @Override + public void initialize(Server server, Injector injector) + { + ResourceHandler resourceHandler = new ResourceHandler(); + resourceHandler.setResourceBase(DruidMaster.class.getClassLoader().getResource("static").toExternalForm()); + + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.setContextPath("/"); + + HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); + server.setHandler(handlerList); + + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GzipFilter.class, "/*", null); + root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + } +} diff --git a/server/src/main/java/io/druid/server/initialization/Initialization.java b/services/src/main/java/io/druid/cli/Initialization.java similarity index 80% rename from server/src/main/java/io/druid/server/initialization/Initialization.java rename to services/src/main/java/io/druid/cli/Initialization.java index 96b20ae077a..4d5b40c87d0 100644 --- a/server/src/main/java/io/druid/server/initialization/Initialization.java +++ b/services/src/main/java/io/druid/cli/Initialization.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.server.initialization; +package io.druid.cli; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; @@ -29,12 +29,35 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.util.Modules; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import io.druid.curator.CuratorModule; +import io.druid.curator.discovery.DiscoveryModule; +import io.druid.guice.AWSModule; +import io.druid.guice.AnnouncerModule; +import io.druid.guice.DataSegmentPusherPullerModule; +import io.druid.guice.DbConnectorModule; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidSecondaryModule; +import io.druid.guice.HttpClientModule; +import io.druid.guice.IndexingServiceDiscoveryModule; +import io.druid.guice.JacksonConfigManagerModule; +import io.druid.guice.LifecycleModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; +import io.druid.guice.ServerModule; +import io.druid.guice.ServerViewModule; +import io.druid.guice.StorageNodeModule; +import io.druid.guice.TaskLogsModule; +import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.initialization.DruidModule; +import io.druid.server.initialization.EmitterModule; +import io.druid.server.initialization.ExtensionsConfig; +import io.druid.server.initialization.JettyServerModule; +import io.druid.server.metrics.MetricsModule; import io.tesla.aether.TeslaAether; import io.tesla.aether.internal.DefaultTeslaAether; import org.eclipse.aether.artifact.Artifact; @@ -171,8 +194,33 @@ public class Initialization } } - public static Injector makeInjectorWithModules(final Injector baseInjector, List modules) + public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable modules) { + final ModuleList defaultModules = new ModuleList(baseInjector); + defaultModules.addModules( + new LifecycleModule(), + EmitterModule.class, + HttpClientModule.global(), + new HttpClientModule("druid.broker.http", Client.class), + new CuratorModule(), + new AnnouncerModule(), + new DruidProcessingModule(), + new AWSModule(), + new MetricsModule(), + new ServerModule(), + new StorageNodeModule(), + new JettyServerModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), + new DiscoveryModule(), + new ServerViewModule(), + new DbConnectorModule(), + new JacksonConfigManagerModule(), + new IndexingServiceDiscoveryModule(), + new DataSegmentPusherPullerModule(), + new TaskLogsModule() + ); + ModuleList actualModules = new ModuleList(baseInjector); actualModules.addModule(DruidSecondaryModule.class); for (Object module : modules) { @@ -184,7 +232,7 @@ public class Initialization actualModules.addModule(module); } - return Guice.createInjector(actualModules.getModules()); + return Guice.createInjector(Modules.override(defaultModules.getModules()).with(actualModules.getModules())); } private static class ModuleList @@ -233,6 +281,13 @@ public class Initialization } } + public void addModules(Object... object) + { + for (Object o : object) { + addModule(o); + } + } + private DruidModule registerJacksonModules(DruidModule module) { for (com.fasterxml.jackson.databind.Module jacksonModule : module.getJacksonModules()) { diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index c0d2925b68b..3c39a82d0ed 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -33,7 +33,6 @@ import io.druid.guice.JsonConfigProvider; import io.druid.jackson.JacksonModule; import io.druid.server.initialization.ConfigModule; import io.druid.server.initialization.ExtensionsConfig; -import io.druid.server.initialization.Initialization; import io.druid.server.initialization.PropertiesModule; import java.util.List; diff --git a/services/src/main/java/io/druid/cli/MiddleManagerJettyServerInitializer.java b/services/src/main/java/io/druid/cli/MiddleManagerJettyServerInitializer.java new file mode 100644 index 00000000000..965ee7a2be0 --- /dev/null +++ b/services/src/main/java/io/druid/cli/MiddleManagerJettyServerInitializer.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceFilter; +import io.druid.server.initialization.JettyServerInitializer; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.GzipFilter; + +/** +*/ +class MiddleManagerJettyServerInitializer implements JettyServerInitializer +{ + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GzipFilter.class, "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{root, new DefaultHandler()}); + server.setHandler(handlerList); + } +} diff --git a/services/src/main/java/io/druid/cli/ServerRunnable.java b/services/src/main/java/io/druid/cli/ServerRunnable.java index aa2508d6007..e4ea30b1a70 100644 --- a/services/src/main/java/io/druid/cli/ServerRunnable.java +++ b/services/src/main/java/io/druid/cli/ServerRunnable.java @@ -25,7 +25,6 @@ import com.google.inject.Injector; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import io.druid.initialization.LogLevelAdjuster; -import io.druid.server.initialization.Initialization; import java.util.List; @@ -56,7 +55,9 @@ public abstract class ServerRunnable implements Runnable try { LogLevelAdjuster.register(); - final Injector injector = Initialization.makeInjectorWithModules(baseInjector, getModules()); + final Injector injector = Initialization.makeInjectorWithModules( + baseInjector, getModules() + ); final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); try { diff --git a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java index 78db82ae663..da7e14e4122 100644 --- a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java +++ b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java @@ -98,7 +98,11 @@ public class ConvertProperties implements Runnable new Rename("druid.indexer.rowFlushBoundary", "druid.indexer.task.rowFlushBoundary"), new Rename("druid.worker.taskActionClient.retry.minWaitMillis", "druid.worker.taskActionClient.retry.minWait"), new Rename("druid.worker.taskActionClient.retry.maxWaitMillis", "druid.worker.taskActionClient.retry.maxWait"), - new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName") + new Rename("druid.master.merger.service", "druid.selectors.indexing.serviceName"), + new DataSegmentPusherDefaultConverter(), + new Rename("druid.pusher.hdfs.storageDirectory", "druid.pusher.storageDirectory"), + new Rename("druid.pusher.cassandra.host", "druid.pusher.host"), + new Rename("druid.pusher.cassandra.keySpace", "druid.pusher.keySpace") ); @Option(name = "-f", title = "file", description = "The properties file to convert", required = true) @@ -150,6 +154,10 @@ public class ConvertProperties implements Runnable } } + updatedProps.setProperty( + "druid.monitoring.monitors", "[\"io.druid.server.metrics.ServerMonitor\", \"com.metamx.metrics.SysMonitor\"]" + ); + try (Writer out = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8)) { updatedProps.store(out, null); diff --git a/services/src/main/java/io/druid/cli/convert/DataSegmentPusherDefaultConverter.java b/services/src/main/java/io/druid/cli/convert/DataSegmentPusherDefaultConverter.java new file mode 100644 index 00000000000..f6a830f5057 --- /dev/null +++ b/services/src/main/java/io/druid/cli/convert/DataSegmentPusherDefaultConverter.java @@ -0,0 +1,70 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli.convert; + +import com.google.api.client.util.Maps; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + */ +public class DataSegmentPusherDefaultConverter implements PropertyConverter +{ + Set propertiesHandled = Sets.newHashSet("druid.pusher.local", "druid.pusher.cassandra", "druid.pusher.hdfs"); + + @Override + public boolean canHandle(String property) + { + return propertiesHandled.contains(property) || property.startsWith("druid.pusher.s3"); + } + + @Override + public Map convert(Properties props) + { + String type = null; + if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) { + type = "local"; + } + else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) { + type = "c*"; + } + else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) { + type = "hdfs"; + } + + if (type != null) { + return ImmutableMap.of("druid.pusher.type", type); + } + + // It's an s3 property, which means we need to set the type and convert the other values. + Map retVal = Maps.newHashMap(); + + retVal.put("druid.pusher.type", type); + retVal.putAll(new Rename("druid.pusher.s3.bucket", "druid.pusher.bucket").convert(props)); + retVal.putAll(new Rename("druid.pusher.s3.baseKey", "druid.pusher.baseKey").convert(props)); + retVal.putAll(new Rename("druid.pusher.s3.disableAcl", "druid.pusher.disableAcl").convert(props)); + + return retVal; + } +} diff --git a/realtime/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java similarity index 72% rename from realtime/src/main/java/io/druid/guice/RealtimeModule.java rename to services/src/main/java/io/druid/guice/RealtimeModule.java index bba4a7d6240..177b67684b4 100644 --- a/realtime/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -19,27 +19,27 @@ package io.druid.guice; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; -import com.google.inject.Module; import com.google.inject.TypeLiteral; -import com.metamx.common.logger.Logger; +import io.druid.cli.QueryJettyServerInitializer; import io.druid.initialization.DruidModule; +import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.server.initialization.JettyServerInitializer; import java.util.Arrays; import java.util.List; /** - */ +*/ public class RealtimeModule implements DruidModule { - private static final Logger log = new Logger(RealtimeModule.class); - @Override public void configure(Binder binder) { @@ -47,18 +47,19 @@ public class RealtimeModule implements DruidModule binder.bind(SegmentPublisher.class).toProvider(SegmentPublisherProvider.class); JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class); - binder.bind( - new TypeLiteral>() - { - } - ).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class); - binder.bind(RealtimeManager.class).in(ManageLifecycle.class); + binder.bind(new TypeLiteral>(){}) + .toProvider(FireDepartmentsProvider.class) + .in(LazySingleton.class); + + binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime")); + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); } @Override - public List getJacksonModules() + public List getJacksonModules() { - return Arrays.asList( + return Arrays.asList( new SimpleModule("RealtimeModule") .registerSubtypes( new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2")