Merge pull request #271 from metamx/indexing-service-realtime

Indexing service fixes, targeted at RealtimeIndexTask
This commit is contained in:
fjy 2013-10-18 13:28:04 -07:00
commit b3984f4e2e
5 changed files with 66 additions and 24 deletions

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.initialization.DruidModule;
import java.util.List;
public class IndexingServiceFirehoseModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("IndexingServiceFirehoseModule")
.registerSubtypes(
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -37,7 +37,6 @@ import io.druid.query.QueryRunner;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List; import java.util.List;
public abstract class AbstractTask implements Task public abstract class AbstractTask implements Task
@ -189,13 +188,12 @@ public abstract class AbstractTask implements Task
{ {
final List<TaskLock> locks = toolbox.getTaskActionClient().submit(new LockListAction()); final List<TaskLock> locks = toolbox.getTaskActionClient().submit(new LockListAction());
if (locks.isEmpty()) { if (locks.isEmpty() && getImplicitLockInterval().isPresent()) {
return Arrays.asList( // In the Peon's local mode, the implicit lock interval is not pre-acquired, so we need to try it here.
toolbox.getTaskActionClient() toolbox.getTaskActionClient().submit(new LockAcquireAction(getImplicitLockInterval().get()));
.submit(new LockAcquireAction(getImplicitLockInterval().get())) return toolbox.getTaskActionClient().submit(new LockListAction());
); } else {
return locks;
} }
return locks;
} }
} }

View File

@ -26,6 +26,7 @@ import com.google.inject.Provides;
import com.google.inject.util.Providers; import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
@ -101,7 +102,8 @@ public class CliMiddleManager extends ServerRunnable
config.getVersion() config.getVersion()
); );
} }
} },
new IndexingServiceFirehoseModule()
); );
} }
} }

View File

@ -30,6 +30,7 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers; import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.JacksonConfigProvider; import io.druid.guice.JacksonConfigProvider;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
@ -206,7 +207,8 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
} }
} },
new IndexingServiceFirehoseModule()
); );
} }

View File

@ -19,13 +19,12 @@
package io.druid.cli; package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -33,6 +32,7 @@ import io.airlift.command.Arguments;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.airlift.command.Option; import io.airlift.command.Option;
import io.druid.guice.Binders; import io.druid.guice.Binders;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
@ -49,7 +49,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider; import io.druid.indexing.common.index.ServiceAnnouncingChatHandlerProvider;
import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.HeapMemoryTaskStorage;
@ -100,7 +99,7 @@ public class CliPeon extends GuiceRunnable
protected List<Object> getModules() protected List<Object> getModules()
{ {
return ImmutableList.<Object>of( return ImmutableList.<Object>of(
new DruidModule() new Module()
{ {
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
@ -179,16 +178,8 @@ public class CliPeon extends GuiceRunnable
.to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
} }
},
@Override new IndexingServiceFirehoseModule()
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("PeonModule")
.registerSubtypes(new NamedType(EventReceiverFirehoseFactory.class, "receiver"))
);
}
}
); );
} }