1) ExecutorNode is working, except for the running of the task. Need to adjust it to be able to run a task and then everything will be wonderful

This commit is contained in:
cheddar 2013-08-26 18:08:41 -05:00
parent 6636ef1ea8
commit 269997dc94
31 changed files with 469 additions and 250 deletions

View File

@ -4,6 +4,7 @@ package com.metamx.druid.client.selector;
*/
public interface Server
{
public String getScheme();
public String getHost();
public int getPort();
}

View File

@ -81,7 +81,7 @@ public class JettyServerModule extends JerseyServletModule
.annotatedWith(Names.named("resourceClasses"))
.toInstance(theResources);
for (Class<?> resource : theResources) {
binder.bind(resource);
binder.bind(resource).in(LazySingleton.class);
}
binder.bind(Key.get(Server.class, Names.named("ForTheEagerness"))).to(Server.class).asEagerSingleton();

View File

@ -32,9 +32,7 @@ public class Execs
{
public static ExecutorService singleThreaded(String nameFormat)
{
return Executors.newSingleThreadExecutor(
makeThreadFactory(nameFormat)
);
return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat));
}
public static ExecutorService multiThreaded(int threads, String nameFormat)
@ -44,9 +42,7 @@ public class Execs
public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat)
{
return Executors.newSingleThreadScheduledExecutor(
makeThreadFactory(nameFormat)
);
return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat));
}
public static ThreadFactory makeThreadFactory(String nameFormat)

View File

@ -47,7 +47,6 @@ public class MiddleManagerModule implements Module
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
}
@Provides @LazySingleton

View File

@ -0,0 +1,67 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import com.metamx.druid.indexing.common.RetryPolicyConfig;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import com.metamx.druid.indexing.common.index.NoopChatHandlerProvider;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.S3DataSegmentKiller;
/**
*/
public class PeonModule implements Module
{
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(NoopChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(ChatHandlerProvider.class)
);
handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class);
handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker.taskActionClient.retry", RetryPolicyConfig.class);
binder.bind(TaskActionClientFactory.class).to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(DataSegmentKiller.class).to(S3DataSegmentKiller.class).in(LazySingleton.class);
}
}

View File

@ -19,7 +19,6 @@
package com.metamx.druid.indexing.common;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Duration;
@ -29,41 +28,35 @@ public class RetryPolicy
{
private static final EmittingLogger log = new EmittingLogger(RetryPolicy.class);
private final long MAX_NUM_RETRIES;
private final Duration MAX_RETRY_DURATION;
private final long maxNumRetries;
private final Duration maxRetryDelay;
private volatile Duration currRetryDelay;
private volatile int retryCount;
public RetryPolicy(RetryPolicyConfig config)
{
this.MAX_NUM_RETRIES = config.getMaxRetryCount();
this.MAX_RETRY_DURATION = config.getRetryMaxDuration();
this.maxNumRetries = config.getMaxRetryCount();
this.maxRetryDelay = config.getMaxWait().toStandardDuration();
this.currRetryDelay = config.getRetryMinDuration();
this.currRetryDelay = config.getMinWait().toStandardDuration();
this.retryCount = 0;
}
public Duration getRetryDelay()
{
return currRetryDelay;
}
public Duration getAndIncrementRetryDelay()
{
Duration retVal = new Duration(currRetryDelay);
currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, MAX_RETRY_DURATION.getMillis()));
retryCount++;
return retVal;
if (hasExceededRetryThreshold()) {
return null;
}
public int getNumRetries()
{
return retryCount;
Duration retVal = currRetryDelay;
currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, maxRetryDelay.getMillis()));
++retryCount;
return retVal;
}
public boolean hasExceededRetryThreshold()
{
return retryCount >= MAX_NUM_RETRIES;
return retryCount >= maxNumRetries;
}
}

View File

@ -0,0 +1,70 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
/**
*/
public class RetryPolicyConfig
{
@JsonProperty
private Period minWait = new Period("PT1M");
@JsonProperty
private Period maxWait = new Period("PT10M");
@JsonProperty
private long maxRetryCount = 10;
public Period getMinWait()
{
return minWait;
}
RetryPolicyConfig setMinWait(Period minWait)
{
this.minWait = minWait;
return this;
}
public Period getMaxWait()
{
return maxWait;
}
RetryPolicyConfig setMaxWait(Period maxWait)
{
this.maxWait = maxWait;
return this;
}
public long getMaxRetryCount()
{
return maxRetryCount;
}
RetryPolicyConfig setMaxRetryCount(long maxRetryCount)
{
this.maxRetryCount = maxRetryCount;
return this;
}
}

View File

@ -19,7 +19,7 @@
package com.metamx.druid.indexing.common;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.google.inject.Inject;
/**
*/
@ -27,6 +27,7 @@ public class RetryPolicyFactory
{
private final RetryPolicyConfig config;
@Inject
public RetryPolicyFactory(RetryPolicyConfig config)
{
this.config = config;

View File

@ -20,6 +20,7 @@
package com.metamx.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
@ -49,6 +50,7 @@ public class TaskToolboxFactory
private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper;
@Inject
public TaskToolboxFactory(
TaskConfig config,
TaskActionClientFactory taskActionClientFactory,

View File

@ -6,13 +6,13 @@ import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.client.selector.Server;
import com.metamx.druid.indexing.common.RetryPolicy;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.joda.time.Duration;
import java.io.IOException;
@ -23,7 +23,7 @@ public class RemoteTaskActionClient implements TaskActionClient
{
private final Task task;
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final IndexingServiceSelector serviceProvider;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
@ -32,7 +32,7 @@ public class RemoteTaskActionClient implements TaskActionClient
public RemoteTaskActionClient(
Task task,
HttpClient httpClient,
ServiceProvider serviceProvider,
IndexingServiceSelector serviceProvider,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
@ -79,20 +79,23 @@ public class RemoteTaskActionClient implements TaskActionClient
}
final Map<String, Object> responseDict = jsonMapper.readValue(
response,
new TypeReference<Map<String, Object>>() {}
response, new TypeReference<Map<String, Object>>()
{
}
);
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
} catch(IOException e) {
log.warn(e, "Exception submitting action for task: %s", task.getId());
}
catch (IOException e) {
log.warn(e, "Exception submitting action for task[%s]", task.getId());
if (retryPolicy.hasExceededRetryThreshold()) {
final Duration delay = retryPolicy.getAndIncrementRetryDelay();
if (delay == null) {
throw e;
} else {
try {
final long sleepTime = retryPolicy.getAndIncrementRetryDelay().getMillis();
log.info("Will try again in %s.", new Duration(sleepTime).toString());
final long sleepTime = delay.getMillis();
log.info("Will try again in [%s].", new Duration(sleepTime).toString());
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
@ -105,26 +108,19 @@ public class RemoteTaskActionClient implements TaskActionClient
private URI getServiceUri() throws Exception
{
final ServiceInstance instance = serviceProvider.getInstance();
final String scheme;
final String host;
final int port;
final String path = "/druid/indexer/v1/action";
final Server instance = serviceProvider.pick();
if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!");
}
host = instance.getAddress();
if (instance.getSslPort() != null && instance.getSslPort() > 0) {
scheme = "https";
port = instance.getSslPort();
} else {
scheme = "http";
port = instance.getPort();
}
return new URI(scheme, null, host, port, path, null, null);
return new URI(
instance.getScheme(),
null,
instance.getHost(),
instance.getPort(),
"/druid/indexer/v1/action",
null,
null
);
}
}

View File

@ -20,23 +20,26 @@
package com.metamx.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.indexing.common.task.Task;
import com.google.inject.Inject;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.http.client.HttpClient;
import org.apache.curator.x.discovery.ServiceProvider;
/**
*/
public class RemoteTaskActionClientFactory implements TaskActionClientFactory
{
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final IndexingServiceSelector serviceProvider;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
@Inject
public RemoteTaskActionClientFactory(
HttpClient httpClient,
ServiceProvider serviceProvider,
@Global HttpClient httpClient,
IndexingServiceSelector serviceProvider,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)

View File

@ -19,38 +19,65 @@
package com.metamx.druid.indexing.common.config;
import com.google.common.base.Joiner;
import org.skife.config.Config;
import org.skife.config.Default;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.File;
public abstract class TaskConfig
public class TaskConfig
{
private static Joiner joiner = Joiner.on("/");
@JsonProperty
private final String baseDir;
@Config("druid.indexer.baseDir")
@Default("/tmp/")
public abstract String getBaseDir();
@JsonProperty
private final File baseTaskDir;
@JsonProperty
private final String hadoopWorkingPath;
@JsonProperty
private final int defaultRowFlushBoundary;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary
)
{
this.baseDir = baseDir == null ? "/tmp" : baseDir;
this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task"));
this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing");
this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary;
}
public String getBaseDir()
{
return baseDir;
}
@Config("druid.indexer.taskDir")
public File getBaseTaskDir()
{
return new File(defaultPath("persistent/task"));
return baseTaskDir;
}
@Config("druid.indexer.hadoopWorkingPath")
public String getHadoopWorkingPath()
{
return defaultPath("druid-indexing");
return hadoopWorkingPath;
}
@Config("druid.indexer.rowFlushBoundary")
@Default("500000")
public abstract int getDefaultRowFlushBoundary();
private String defaultPath(String subPath)
public int getDefaultRowFlushBoundary()
{
return joiner.join(getBaseDir(), subPath);
return defaultRowFlushBoundary;
}
private String defaultDir(String configParameter, final String defaultVal)
{
if (configParameter == null) {
return String.format("%s/%s", getBaseDir(), defaultVal);
}
return configParameter;
}
}

View File

@ -21,10 +21,11 @@ package com.metamx.druid.indexing.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.DruidNode;
import java.util.concurrent.ConcurrentMap;
@ -38,16 +39,17 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
{
private static final Logger log = new Logger(EventReceivingChatHandlerProvider.class);
private final ChatHandlerProviderConfig config;
private final DruidNode node;
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, ChatHandler> handlers;
@Inject
public EventReceivingChatHandlerProvider(
ChatHandlerProviderConfig config,
@Self DruidNode node,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.node = node;
this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap();
}
@ -100,6 +102,6 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
private DruidNode makeDruidNode(String key)
{
return new DruidNode(key, config.getHost(), config.getPort());
return new DruidNode(key, node.getHost(), node.getPort());
}
}

View File

@ -28,8 +28,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.Query;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
@ -48,7 +50,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
/**
* Runs tasks in a JVM thread using an ExecutorService.
@ -61,13 +62,13 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
@Inject
public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory,
ExecutorService exec
TaskToolboxFactory toolboxFactory
)
{
this.toolboxFactory = toolboxFactory;
this.exec = MoreExecutors.listeningDecorator(exec);
this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d"));
}
@LifecycleStop

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
@ -36,16 +35,17 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.RetryPolicyConfig;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
@ -86,7 +86,6 @@ import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -322,7 +321,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
taskConfig,
new RemoteTaskActionClientFactory(
httpClient,
coordinatorServiceProvider,
new IndexingServiceSelector(coordinatorServiceProvider),
new RetryPolicyFactory(
configFactory.buildWithReplacements(
RetryPolicyConfig.class,
@ -371,16 +370,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void initializeTaskRunner()
{
if (taskRunner == null) {
this.taskRunner = lifecycle.addManagedInstance(
new ThreadPoolTaskRunner(
taskToolboxFactory,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("task-runner-%d")
.build()
)
)
);
this.taskRunner = lifecycle.addManagedInstance(new ThreadPoolTaskRunner(taskToolboxFactory));
}
}
@ -389,7 +379,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
if (config.isPublishDiscovery()) {
this.chatHandlerProvider = new EventReceivingChatHandlerProvider(config, serviceAnnouncer);
this.chatHandlerProvider = new EventReceivingChatHandlerProvider(null, serviceAnnouncer); // TODO: eliminate
} else {
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
this.chatHandlerProvider = new NoopChatHandlerProvider();

View File

@ -1,9 +1,8 @@
package com.metamx.druid.indexing.coordinator;
package com.metamx.druid.indexing.common;
import com.metamx.druid.indexing.common.RetryPolicy;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import junit.framework.Assert;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Test;
/**
@ -15,25 +14,9 @@ public class RetryPolicyTest
{
RetryPolicy retryPolicy = new RetryPolicy(
new RetryPolicyConfig()
{
@Override
public Duration getRetryMinDuration()
{
return new Duration("PT1S");
}
@Override
public Duration getRetryMaxDuration()
{
return new Duration("PT10S");
}
@Override
public long getMaxRetryCount()
{
return 10;
}
}
.setMinWait(new Period("PT1S"))
.setMaxWait(new Period("PT10S"))
.setMaxRetryCount(6)
);
Assert.assertEquals(new Duration("PT1S"), retryPolicy.getAndIncrementRetryDelay());
@ -42,5 +25,7 @@ public class RetryPolicyTest
Assert.assertEquals(new Duration("PT8S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay());
Assert.assertEquals(null, retryPolicy.getAndIncrementRetryDelay());
Assert.assertTrue(retryPolicy.hasExceededRetryThreshold());
}
}

View File

@ -49,7 +49,6 @@ import org.junit.Test;
import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -306,6 +305,8 @@ public class RemoteTaskRunnerTest
);
workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper,
@ -313,29 +314,9 @@ public class RemoteTaskRunnerTest
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig()
{
@Override
public String getBaseDir()
{
File tmp = Files.createTempDir();
tmp.deleteOnExit();
return tmp.toString();
}
@Override
public int getDefaultRowFlushBoundary()
{
return 0;
}
@Override
public String getHadoopWorkingPath()
{
return null;
}
}, null, null, null, null, null, null, null, null, null, jsonMapper
), Executors.newSingleThreadExecutor()
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, jsonMapper
)
),
new WorkerConfig().setCapacity(1)
);

View File

@ -78,7 +78,6 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
public class TaskLifecycleTest
{
@ -116,26 +115,7 @@ public class TaskLifecycleTest
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
new TaskConfig()
{
@Override
public String getBaseDir()
{
return tmp.toString();
}
@Override
public int getDefaultRowFlushBoundary()
{
return 50000;
}
@Override
public String getHadoopWorkingPath()
{
return null;
}
},
new TaskConfig(tmp.toString(), null, null, 50000),
tac,
newMockEmitter(),
null, // s3 client
@ -162,10 +142,7 @@ public class TaskLifecycleTest
new DefaultObjectMapper()
);
tr = new ThreadPoolTaskRunner(
tb,
Executors.newSingleThreadExecutor()
);
tr = new ThreadPoolTaskRunner(tb);
tc = new TaskConsumer(tq, tr, tac, newMockEmitter());
tsqa = new TaskStorageQueryAdapter(ts);

View File

@ -52,6 +52,12 @@ public class IndexingServiceSelector implements DiscoverySelector<Server>
{
return instance.getPort();
}
@Override
public String getScheme()
{
return "http";
}
};
}

View File

@ -1,6 +1,6 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -17,25 +17,19 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.config;
package com.metamx.druid.client.indexing;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public abstract class RetryPolicyConfig
public class IndexingServiceSelectorConfig
{
@Config("${base_path}.retry.minWaitMillis")
@Default("PT1M") // 1 minute
public abstract Duration getRetryMinDuration();
@JsonProperty
private String serviceName = null;
@Config("${base_path}.retry.maxWaitMillis")
@Default("PT10M") // 10 minutes
public abstract Duration getRetryMaxDuration();
@Config("${base_path}.retry.maxRetryCount")
@Default("10")
public abstract long getMaxRetryCount();
public String getServiceName()
{
return serviceName;
}
}

View File

@ -4,14 +4,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.indexing.IndexingService;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.client.selector.DiscoverySelector;
import com.metamx.druid.client.selector.Server;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseRuleManagerProvider;
@ -26,11 +21,6 @@ import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterConfig;
import com.metamx.druid.master.LoadQueueTaskMaster;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
/**
*/
@ -56,10 +46,6 @@ public class CoordinatorModule implements Module
.toProvider(DatabaseRuleManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
.annotatedWith(IndexingService.class)
.to(IndexingServiceSelector.class)
.in(ManageLifecycle.class);
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(MasterRedirectInfo.class).in(LazySingleton.class);
@ -67,35 +53,6 @@ public class CoordinatorModule implements Module
binder.bind(DruidMaster.class);
}
@Provides @LazySingleton @IndexingService
public ServiceProvider getServiceProvider(DruidMasterConfig config, ServiceDiscovery<Void> serviceDiscovery)
{
// TODO: This service discovery stuff is really really janky. It needs to be reworked.
if (config.getMergerServiceName() == null) {
return new ServiceProvider()
{
@Override
public void start() throws Exception
{
}
@Override
public ServiceInstance getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
}
};
}
return serviceDiscovery.serviceProviderBuilder().serviceName(config.getMergerServiceName()).build();
}
@Provides @LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config

View File

@ -0,0 +1,83 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.druid.client.indexing.IndexingService;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.client.indexing.IndexingServiceSelectorConfig;
import com.metamx.druid.client.selector.DiscoverySelector;
import com.metamx.druid.client.selector.Server;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException;
/**
*/
public class IndexingServiceDiscoveryModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class);
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
.annotatedWith(IndexingService.class)
.to(IndexingServiceSelector.class);
binder.bind(IndexingServiceSelector.class).in(ManageLifecycle.class);
}
@Provides
@LazySingleton @IndexingService
public ServiceProvider getServiceProvider(
IndexingServiceSelectorConfig config,
ServiceDiscovery<Void> serviceDiscovery
)
{
if (config.getServiceName() == null) {
return new ServiceProvider()
{
@Override
public void start() throws Exception
{
}
@Override
public ServiceInstance getInstance() throws Exception
{
return null;
}
@Override
public void close() throws IOException
{
}
};
}
return serviceDiscovery.serviceProviderBuilder().serviceName(config.getServiceName()).build();
}
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.coordination.DruidServerMetadata;
@ -33,7 +34,7 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
/**
*/
public class StorageNodeModule extends ServerModule
public class StorageNodeModule implements Module
{
private final String nodeType;
@ -45,8 +46,6 @@ public class StorageNodeModule extends ServerModule
@Override
public void configure(Binder binder)
{
super.configure(binder);
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);

View File

@ -60,12 +60,6 @@ public abstract class DruidMasterConfig
return false;
}
@Config("druid.master.merger.service")
public String getMergerServiceName()
{
return null;
}
@Config("druid.master.merge.threshold")
public long getMergeBytesLimit()
{

View File

@ -100,12 +100,6 @@ public class DruidMasterTest
return super.getMillisToWaitBeforeDeleting();
}
@Override
public String getMergerServiceName()
{
return "";
}
@Override
public int getMaxSegmentsToMove()
{

View File

@ -8,6 +8,7 @@ import com.metamx.druid.curator.discovery.DiscoveryModule;
import com.metamx.druid.guice.CoordinatorModule;
import com.metamx.druid.guice.DbConnectorModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.IndexingServiceDiscoveryModule;
import com.metamx.druid.guice.JacksonConfigManagerModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.ServerModule;
@ -68,6 +69,7 @@ public class CliCoordinator extends ServerRunnable
.addResource(MasterResource.class)
.addResource(StatusResource.class),
new ServerViewModule(),
new IndexingServiceDiscoveryModule(),
CoordinatorModule.class
);
}

View File

@ -33,6 +33,7 @@ import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
@ -70,6 +71,7 @@ public class CliHistorical extends ServerRunnable
AWSModule.class,
DataSegmentPullerModule.class,
new MetricsModule().register(ServerMonitor.class),
new ServerModule(),
new StorageNodeModule("historical"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),

View File

@ -0,0 +1,92 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.curator.discovery.DiscoveryModule;
import com.metamx.druid.guice.AWSModule;
import com.metamx.druid.guice.AnnouncerModule;
import com.metamx.druid.guice.DataSegmentPusherModule;
import com.metamx.druid.guice.DruidProcessingModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.IndexingServiceDiscoveryModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.PeonModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.ServerViewModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.executor.ChatHandlerResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.metrics.MetricsModule;
import io.airlift.command.Command;
/**
*/
@Command(
name = "peon",
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
+ "This should rarely, if ever, be used directly."
)
public class CliPeon extends ServerRunnable
{
private static final Logger log = new Logger(CliPeon.class);
public CliPeon()
{
super(log);
}
@Override
protected Injector getInjector()
{
// TODO: make it take and run a task
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
new ServerModule(),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class)
.addResource(ChatHandlerResource.class),
new DiscoveryModule(),
new ServerViewModule(),
new StorageNodeModule("real-time"),
new DataSegmentPusherModule(),
new AnnouncerModule(),
new DruidProcessingModule(),
new QueryableModule(ThreadPoolTaskRunner.class),
new QueryRunnerFactoryModule(),
new IndexingServiceDiscoveryModule(),
new AWSModule(),
new PeonModule()
);
}
}

View File

@ -32,6 +32,7 @@ import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.RealtimeModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.ServerViewModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
@ -71,6 +72,7 @@ public class CliRealtime extends ServerRunnable
AWSModule.class,
DataSegmentPusherModule.class,
new MetricsModule(),
new ServerModule(),
new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),

View File

@ -25,6 +25,7 @@ import com.metamx.druid.guice.DruidProcessingModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.QueryRunnerFactoryModule;
import com.metamx.druid.guice.QueryableModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.StorageNodeModule;
import com.metamx.druid.http.StatusResource;
import com.metamx.druid.initialization.EmitterModule;
@ -56,6 +57,7 @@ public class CliRealtimeExample extends ServerRunnable
new LifecycleModule(),
EmitterModule.class,
DruidProcessingModule.class,
new ServerModule(),
new StorageNodeModule("realtime"),
new JettyServerModule(new QueryJettyServerInitializer())
.addResource(StatusResource.class),

View File

@ -40,7 +40,8 @@ public class Main
.withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class)
.withCommands(
CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class, CliMiddleManager.class
CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class,
CliOverlord.class, CliMiddleManager.class, CliPeon.class
);
builder.withGroup("example")