From 2e9c46867f5455c581fc064ddb8076be15696ba8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 18 Oct 2013 11:14:33 -0700 Subject: [PATCH] Fixes for indexing service. - Create IndexingServiceFirehoseModule so firehoses can be loaded by all mains - Fix implicit lock acquisition in AbstractTask --- .../guice/IndexingServiceFirehoseModule.java | 30 +++++++++++++++++++ .../indexing/common/task/AbstractTask.java | 14 ++++----- .../java/io/druid/cli/CliMiddleManager.java | 4 ++- .../main/java/io/druid/cli/CliOverlord.java | 4 ++- .../src/main/java/io/druid/cli/CliPeon.java | 19 ++++-------- 5 files changed, 47 insertions(+), 24 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java new file mode 100644 index 00000000000..3698396d22d --- /dev/null +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java @@ -0,0 +1,30 @@ +package io.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class IndexingServiceFirehoseModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("IndexingServiceFirehoseModule") + .registerSubtypes( + new NamedType(EventReceiverFirehoseFactory.class, "receiver") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index 7905776011a..1944243e7fe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -37,7 +37,6 @@ import io.druid.query.QueryRunner; import org.joda.time.Interval; import java.io.IOException; -import java.util.Arrays; import java.util.List; public abstract class AbstractTask implements Task @@ -189,13 +188,12 @@ public abstract class AbstractTask implements Task { final List locks = toolbox.getTaskActionClient().submit(new LockListAction()); - if (locks.isEmpty()) { - return Arrays.asList( - toolbox.getTaskActionClient() - .submit(new LockAcquireAction(getImplicitLockInterval().get())) - ); + if (locks.isEmpty() && getImplicitLockInterval().isPresent()) { + // In the Peon's local mode, the implicit lock interval is not pre-acquired, so we need to try it here. + toolbox.getTaskActionClient().submit(new LockAcquireAction(getImplicitLockInterval().get())); + return toolbox.getTaskActionClient().submit(new LockListAction()); + } else { + return locks; } - - return locks; } } diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index d4fcb5cf0fa..95319e7ee55 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -26,6 +26,7 @@ import com.google.inject.Provides; import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; +import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -101,7 +102,8 @@ public class CliMiddleManager extends ServerRunnable config.getVersion() ); } - } + }, + new IndexingServiceFirehoseModule() ); } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index fa0ed6d597e..9fb298f3d59 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -30,6 +30,7 @@ import com.google.inject.servlet.GuiceFilter; import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; +import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.JacksonConfigProvider; import io.druid.guice.Jerseys; @@ -206,7 +207,8 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); } - } + }, + new IndexingServiceFirehoseModule() ); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index bcc0d35b6fc..d9613057db5 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -19,13 +19,12 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; @@ -33,6 +32,7 @@ import io.airlift.command.Arguments; import io.airlift.command.Command; import io.airlift.command.Option; import io.druid.guice.Binders; +import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; @@ -49,7 +49,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider; import io.druid.indexing.overlord.HeapMemoryTaskStorage; @@ -100,7 +99,7 @@ public class CliPeon extends GuiceRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -179,16 +178,8 @@ public class CliPeon extends GuiceRunnable .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("PeonModule") - .registerSubtypes(new NamedType(EventReceiverFirehoseFactory.class, "receiver")) - ); - } - } + }, + new IndexingServiceFirehoseModule() ); }