- RealtimeIndexTask: Drop locks on startup
- RealtimeIndexTask: Acquire lock before announcing a new segment
- Retry failed RemoteTaskActionClient submissions using a RetryPolicy
- Add comments to RealtimeIndexTask
This commit is contained in:
Gian Merlino 2013-03-11 14:08:05 -07:00
parent 6245e38981
commit 34d6b3a7f0
14 changed files with 143 additions and 75 deletions

View File

@ -17,9 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
package com.metamx.druid.merger.common;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Duration;

View File

@ -17,9 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
package com.metamx.druid.merger.common;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
/**
*/

View File

@ -24,6 +24,8 @@ public class LocalTaskActionClient implements TaskActionClient
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
final RetType ret = taskAction.perform(task, toolbox);
// Add audit log

View File

@ -1,15 +1,11 @@
package com.metamx.druid.merger.common.actions;
import com.google.common.base.Throwables;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.metamx.druid.merger.common.task.Task;
import org.joda.time.Interval;
import java.util.List;
public class LockReleaseAction implements TaskAction<Void>
{
private final Interval interval;

View File

@ -4,6 +4,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
@ -21,50 +23,77 @@ public class RemoteTaskActionClient implements TaskActionClient
private final Task task;
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
private static final Logger log = new Logger(RemoteTaskActionClient.class);
public RemoteTaskActionClient(Task task, HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
public RemoteTaskActionClient(
Task task,
HttpClient httpClient,
ServiceProvider serviceProvider,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
{
this.task = task;
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper;
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
byte[] dataToSend = jsonMapper.writeValueAsBytes(new TaskActionHolder(task, taskAction));
final URI serviceUri;
try {
serviceUri = getServiceUri();
}
catch (Exception e) {
throw new IOException("Failed to locate service uri", e);
}
final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
final String response;
while (true) {
try {
final URI serviceUri;
try {
serviceUri = getServiceUri();
}
catch (Exception e) {
throw new IOException("Failed to locate service uri", e);
}
try {
response = httpClient.post(serviceUri.toURL())
.setContent("application/json", dataToSend)
.go(new ToStringResponseHandler(Charsets.UTF_8))
.get();
final String response;
try {
response = httpClient.post(serviceUri.toURL())
.setContent("application/json", dataToSend)
.go(new ToStringResponseHandler(Charsets.UTF_8))
.get();
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
throw Throwables.propagate(e);
}
final Map<String, Object> responseDict = jsonMapper.readValue(
response,
new TypeReference<Map<String, Object>>() {}
);
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
} catch(IOException e) {
if (retryPolicy.hasExceededRetryThreshold()) {
throw e;
} else {
try {
Thread.sleep(retryPolicy.getAndIncrementRetryDelay().getMillis());
}
catch (InterruptedException e2) {
throw Throwables.propagate(e2);
}
}
}
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
throw Throwables.propagate(e);
}
final Map<String, Object> responseDict = jsonMapper.readValue(
response,
new TypeReference<Map<String, Object>>() {}
);
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
}
private URI getServiceUri() throws Exception

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.http.client.HttpClient;
import com.netflix.curator.x.discovery.ServiceProvider;
@ -30,18 +31,25 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory
{
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
public RemoteTaskActionClientFactory(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
public RemoteTaskActionClientFactory(
HttpClient httpClient,
ServiceProvider serviceProvider,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
{
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper;
}
@Override
public TaskActionClient create(Task task)
{
return new RemoteTaskActionClient(task, httpClient, serviceProvider, jsonMapper);
return new RemoteTaskActionClient(task, httpClient, serviceProvider, retryPolicyFactory, jsonMapper);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.config;
package com.metamx.druid.merger.common.config;
import org.joda.time.Duration;
import org.skife.config.Config;
@ -27,15 +27,15 @@ import org.skife.config.Default;
*/
public abstract class RetryPolicyConfig
{
@Config("druid.indexer.retry.minWaitMillis")
@Config("${base_path}.retry.minWaitMillis")
@Default("PT1M") // 1 minute
public abstract Duration getRetryMinDuration();
@Config("druid.indexer.retry.maxWaitMillis")
@Config("${base_path}.retry.maxWaitMillis")
@Default("PT10M") // 10 minutes
public abstract Duration getRetryMaxDuration();
@Config("druid.indexer.retry.maxRetryCount")
@Config("${base_path}.retry.maxRetryCount")
@Default("10")
public abstract long getMaxRetryCount();
}

View File

@ -14,6 +14,7 @@ import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockAcquireAction;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.LockReleaseAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.QueryRunner;
@ -109,14 +110,20 @@ public class RealtimeIndexTask extends AbstractTask
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
}
// Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire
// them if we actually need them
for (final TaskLock taskLock : toolbox.getTaskActionClient().submit(new LockListAction())) {
toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval()));
}
boolean normalExit = true;
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
final Firehose firehose = firehoseFactory.connect();
// TODO Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
// TODO the ServerView, which seems kind of odd?)
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
// TODO -- the ServerView, which seems kind of odd?)
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskDir(), "persist"),
@ -125,12 +132,19 @@ public class RealtimeIndexTask extends AbstractTask
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);
// TODO -- We're adding stuff to talk to the coordinator in various places in the plumber, and may
// TODO -- want to be more robust to coordinator downtime (currently we'll block/throw in whatever
// TODO -- thread triggered the coordinator behavior, which will typically be either the main
// TODO -- data processing loop or the persist thread)
// Wrap default SegmentAnnouncer such that we unlock intervals as we unannounce segments
final SegmentAnnouncer lockingSegmentAnnouncer = new SegmentAnnouncer()
{
@Override
public void announceSegment(final DataSegment segment) throws IOException
{
// NOTE: Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getSegmentAnnouncer().announceSegment(segment);
}
@ -146,8 +160,10 @@ public class RealtimeIndexTask extends AbstractTask
};
// TODO -- This can block if there is lock contention, which will block plumber.getSink (and thus the firehose)
// TODO -- Shouldn't usually be bad, since we don't expect people to submit tasks that intersect with the
// TODO -- realtime window, but if they do it can be problematic
// TODO -- Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the
// TODO -- realtime window, but if they do it can be problematic.
// TODO -- If we decide to care, we can use more threads in the plumber such that waiting for the coordinator
// TODO -- doesn't block data processing.
final RealtimePlumberSchool.VersioningPolicy versioningPolicy = new RealtimePlumberSchool.VersioningPolicy()
{
@Override

View File

@ -31,6 +31,7 @@ import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
@ -274,20 +275,24 @@ public class RemoteTaskRunner implements TaskRunner
private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem, final String workerId)
{
final String taskId = taskRunnerWorkItem.getTask().getId();
log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId);
scheduledExec.schedule(
new Runnable()
{
@Override
public void run()
if (!taskRunnerWorkItem.getRetryPolicy().hasExceededRetryThreshold()) {
log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId);
scheduledExec.schedule(
new Runnable()
{
cleanup(workerId, taskId);
addPendingTask(taskRunnerWorkItem);
}
},
taskRunnerWorkItem.getRetryPolicy().getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
@Override
public void run()
{
cleanup(workerId, taskId);
addPendingTask(taskRunnerWorkItem);
}
},
taskRunnerWorkItem.getRetryPolicy().getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
} else {
log.makeAlert("Task exceeded retry threshold").addData("task", taskId).emit();
}
}
/**

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.task.Task;
import org.joda.time.DateTime;

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
@ -40,7 +41,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
@ -73,7 +73,7 @@ import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.RetryPolicyFactory;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.coordinator.TaskLockbox;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskQueue;
@ -85,7 +85,7 @@ import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
@ -95,7 +95,6 @@ import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFa
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.http.WorkerNode;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
@ -127,7 +126,6 @@ import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@ -636,7 +634,12 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
curatorFramework,
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
new RetryPolicyFactory(
configFactory.buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.indexing")
)
),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class)
);

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
@ -32,7 +33,6 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.Query;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
@ -45,23 +45,20 @@ import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.worker.WorkerTaskMonitor;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
import com.metamx.druid.merger.worker.WorkerTaskMonitor;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
@ -84,7 +81,6 @@ import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.joda.time.Interval;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
@ -388,7 +384,17 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
lifecycle.addManagedInstance(segmentAnnouncer);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClientFactory(httpClient, coordinatorServiceProvider, getJsonMapper()),
new RemoteTaskActionClientFactory(
httpClient,
coordinatorServiceProvider,
new RetryPolicyFactory(
configFactory.buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.worker.taskActionClient")
)
),
getJsonMapper()
),
emitter,
s3Service,
segmentPusher,

View File

@ -9,13 +9,14 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import junit.framework.Assert;
import org.joda.time.Duration;
import org.junit.Test;