Fixes for indexing service.

- Create IndexingServiceFirehoseModule so firehoses can be loaded by all mains
- Fix implicit lock acquisition in AbstractTask
This commit is contained in:
Gian Merlino 2013-10-18 11:14:33 -07:00
parent 5bec1cd881
commit 2e9c46867f
5 changed files with 47 additions and 24 deletions

View File

@ -0,0 +1,30 @@
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.initialization.DruidModule;
import java.util.List;
public class IndexingServiceFirehoseModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("IndexingServiceFirehoseModule")
.registerSubtypes(
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -37,7 +37,6 @@ import io.druid.query.QueryRunner;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List; import java.util.List;
public abstract class AbstractTask implements Task public abstract class AbstractTask implements Task
@ -189,13 +188,12 @@ public abstract class AbstractTask implements Task
{ {
final List<TaskLock> locks = toolbox.getTaskActionClient().submit(new LockListAction()); final List<TaskLock> locks = toolbox.getTaskActionClient().submit(new LockListAction());
if (locks.isEmpty()) { if (locks.isEmpty() && getImplicitLockInterval().isPresent()) {
return Arrays.asList( // In the Peon's local mode, the implicit lock interval is not pre-acquired, so we need to try it here.
toolbox.getTaskActionClient() toolbox.getTaskActionClient().submit(new LockAcquireAction(getImplicitLockInterval().get()));
.submit(new LockAcquireAction(getImplicitLockInterval().get())) return toolbox.getTaskActionClient().submit(new LockListAction());
); } else {
}
return locks; return locks;
} }
} }
}

View File

@ -26,6 +26,7 @@ import com.google.inject.Provides;
import com.google.inject.util.Providers; import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
@ -101,7 +102,8 @@ public class CliMiddleManager extends ServerRunnable
config.getVersion() config.getVersion()
); );
} }
} },
new IndexingServiceFirehoseModule()
); );
} }
} }

View File

@ -30,6 +30,7 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers; import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.JacksonConfigProvider; import io.druid.guice.JacksonConfigProvider;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
@ -206,7 +207,8 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
} }
} },
new IndexingServiceFirehoseModule()
); );
} }

View File

@ -19,13 +19,12 @@
package io.druid.cli; package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -33,6 +32,7 @@ import io.airlift.command.Arguments;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.airlift.command.Option; import io.airlift.command.Option;
import io.druid.guice.Binders; import io.druid.guice.Binders;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
@ -49,7 +49,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider; import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider;
import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.HeapMemoryTaskStorage;
@ -100,7 +99,7 @@ public class CliPeon extends GuiceRunnable
protected List<Object> getModules() protected List<Object> getModules()
{ {
return ImmutableList.<Object>of( return ImmutableList.<Object>of(
new DruidModule() new Module()
{ {
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
@ -179,16 +178,8 @@ public class CliPeon extends GuiceRunnable
.to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
} }
},
@Override new IndexingServiceFirehoseModule()
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("PeonModule")
.registerSubtypes(new NamedType(EventReceiverFirehoseFactory.class, "receiver"))
);
}
}
); );
} }