multiple bug fixes for indexing service; scv quotes

This commit is contained in:
fjy 2013-06-06 14:10:18 -07:00
parent 451d3d358b
commit e4ea357b52
11 changed files with 80 additions and 96 deletions

View File

@ -41,8 +41,8 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.indexing.common.RetryPolicy;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker;
@ -164,14 +164,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
event.getData().getData(),
Worker.class
);
log.info("New worker[%s] found!", worker.getHost());
log.info("Worker[%s] reportin' for duty!", worker.getHost());
addWorker(worker);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final Worker worker = jsonMapper.readValue(
event.getData().getData(),
Worker.class
);
log.info("Worker[%s] removed!", worker.getHost());
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
removeWorker(worker);
}
}
@ -352,7 +352,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
try {
return new URL(String.format("http://%s/mmx/worker/v1%s", worker.getHost(), path));
return new URL(String.format("http://%s/druid/worker/v1%s", worker.getHost(), path));
}
catch (MalformedURLException e) {
throw Throwables.propagate(e);
@ -555,7 +555,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final ZkWorker zkWorker = new ZkWorker(
worker,
statusCache
statusCache,
jsonMapper
);
// Add status listener to the watcher for status changes
@ -598,8 +599,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
worker.getHost(),
taskId
);
} else {
zkWorker.addTask(taskRunnerWorkItem);
}
if (taskStatus.isComplete()) {
@ -608,7 +607,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
zkWorker.removeTask(taskRunnerWorkItem);
}
// Worker is done with this task
@ -621,7 +619,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
zkWorker.removeTask(taskRunnerWorkItem);
retryTask(taskRunnerWorkItem);
}
}
@ -711,12 +708,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@Override
public boolean apply(ZkWorker input)
{
for (String taskId : input.getRunningTasks()) {
TaskRunnerWorkItem workerTask = runningTasks.get(taskId);
if (workerTask != null && task.getAvailabilityGroup()
.equalsIgnoreCase(workerTask.getTask().getAvailabilityGroup())) {
return false;
}
}
return (!input.isAtCapacity() &&
input.getWorker()
.getVersion()
.compareTo(workerSetupData.get().getMinVersion()) >= 0 &&
!input.getAvailabilityGroups().contains(task.getAvailabilityGroup())
);
.compareTo(workerSetupData.get().getMinVersion()) >= 0);
}
}
)

View File

@ -20,11 +20,19 @@
package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
@ -36,17 +44,27 @@ public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
private final Set<String> runningTasks;
private final Set<String> availabilityGroups;
private final Function<ChildData, String> cacheConverter;
private volatile DateTime lastCompletedTaskTime = new DateTime();
public ZkWorker(Worker worker, PathChildrenCache statusCache)
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
this.worker = worker;
this.statusCache = statusCache;
this.runningTasks = Sets.newHashSet();
this.availabilityGroups = Sets.newHashSet();
this.cacheConverter = new Function<ChildData, String>()
{
@Override
public String apply(@Nullable ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
}
@JsonProperty
@ -58,13 +76,12 @@ public class ZkWorker implements Closeable
@JsonProperty
public Set<String> getRunningTasks()
{
return runningTasks;
}
@JsonProperty
public Set<String> getAvailabilityGroups()
{
return availabilityGroups;
return Sets.newHashSet(
Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)
);
}
@JsonProperty
@ -76,7 +93,7 @@ public class ZkWorker implements Closeable
@JsonProperty
public boolean isAtCapacity()
{
return runningTasks.size() >= worker.getCapacity();
return statusCache.getCurrentData().size() >= worker.getCapacity();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
@ -84,18 +101,6 @@ public class ZkWorker implements Closeable
lastCompletedTaskTime = completedTaskTime;
}
public void addTask(TaskRunnerWorkItem item)
{
runningTasks.add(item.getTask().getId());
availabilityGroups.add(item.getTask().getAvailabilityGroup());
}
public void removeTask(TaskRunnerWorkItem item)
{
runningTasks.remove(item.getTask().getId());
availabilityGroups.remove(item.getTask().getAvailabilityGroup());
}
@Override
public void close() throws IOException
{
@ -107,8 +112,6 @@ public class ZkWorker implements Closeable
{
return "ZkWorker{" +
"worker=" + worker +
", runningTasks=" + runningTasks +
", availabilityGroups=" + availabilityGroups +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}

View File

@ -311,7 +311,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*");
root.addServlet(new ServletHolder(new DefaultServlet()), "/druid/*");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*"); // backwards compatibility
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(
new FilterHolder(
@ -346,7 +347,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
), "/*", 0
);
root.addFilter(GuiceFilter.class, "/druid/indexer/v1/*", 0);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0); //backwards compatability, soon to be removed
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0); //backwards compatibility, soon to be removed
initialized = true;
}

View File

@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
@ -52,6 +54,7 @@ public class WorkerCuratorCoordinator
private final CuratorFramework curatorFramework;
private final Worker worker;
private final IndexerZkConfig config;
private final Announcer announcer;
private final String baseAnnouncementsPath;
private final String baseTaskPath;
@ -71,6 +74,8 @@ public class WorkerCuratorCoordinator
this.worker = worker;
this.config = config;
this.announcer = new Announcer(curatorFramework, MoreExecutors.sameThreadExecutor());
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getIndexerAnnouncementPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(config.getIndexerTaskPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(config.getIndexerStatusPath(), worker.getHost()));
@ -79,7 +84,7 @@ public class WorkerCuratorCoordinator
@LifecycleStart
public void start() throws Exception
{
log.info("Starting WorkerCuratorCoordinator for server[%s]", worker.getHost());
log.info("WorkerCuratorCoordinator good to go sir. Server[%s]", worker.getHost());
synchronized (lock) {
if (started) {
return;
@ -95,33 +100,8 @@ public class WorkerCuratorCoordinator
CreateMode.PERSISTENT,
ImmutableMap.of("created", new DateTime().toString())
);
makePathIfNotExisting(
getAnnouncementsPathForWorker(),
CreateMode.EPHEMERAL,
worker
);
curatorFramework.getConnectionStateListenable().addListener(
new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
try {
if (newState.equals(ConnectionState.RECONNECTED)) {
makePathIfNotExisting(
getAnnouncementsPathForWorker(),
CreateMode.EPHEMERAL,
worker
);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
announcer.start();
announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker));
started = true;
}
@ -136,9 +116,8 @@ public class WorkerCuratorCoordinator
return;
}
curatorFramework.delete()
.guaranteed()
.forPath(getAnnouncementsPathForWorker());
announcer.unannounce(getAnnouncementsPathForWorker());
announcer.stop();
started = false;
}
@ -193,16 +172,6 @@ public class WorkerCuratorCoordinator
return getPath(Arrays.asList(baseStatusPath, statusId));
}
public boolean statusExists(String id)
{
try {
return (curatorFramework.checkExists().forPath(getStatusPathForId(id)) != null);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void unannounceTask(String taskId)
{
try {

View File

@ -39,7 +39,7 @@ import java.util.concurrent.ExecutorService;
/**
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
*
* <p/>
* The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks.
*/
@ -96,7 +96,10 @@ public class WorkerTaskMonitor
);
if (isTaskRunning(task)) {
log.warn("Got task %s that I am already running...", task.getId());
log.warn(
"I can't build it. There's something in the way. Got task %s that I am already running...",
task.getId()
);
workerCuratorCoordinator.unannounceTask(task.getId());
return;
}
@ -109,7 +112,7 @@ public class WorkerTaskMonitor
{
final long startTime = System.currentTimeMillis();
log.info("Running task [%s]", task.getId());
log.info("Affirmative. Running task [%s]", task.getId());
running.add(task);
TaskStatus taskStatus;
@ -119,11 +122,12 @@ public class WorkerTaskMonitor
taskStatus = taskRunner.run(task).get();
}
catch (Exception e) {
log.makeAlert(e, "Failed to run task")
log.makeAlert(e, "I can't build there. Failed to run task")
.addData("task", task.getId())
.emit();
taskStatus = TaskStatus.failure(task.getId());
} finally {
}
finally {
running.remove(task);
}
@ -131,7 +135,11 @@ public class WorkerTaskMonitor
try {
workerCuratorCoordinator.updateStatus(taskStatus);
log.info("Completed task [%s] with status [%s]", task.getId(), taskStatus.getStatusCode());
log.info(
"Job's finished. Completed [%s] with status [%s]",
task.getId(),
taskStatus.getStatusCode()
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to update task status")

View File

@ -10,7 +10,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
@Path("/mmx/worker/v1")
@Path("/druid/worker/v1")
public class ChatHandlerResource
{
private final ObjectMapper jsonMapper;

View File

@ -220,7 +220,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0);
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0);
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())

View File

@ -209,7 +209,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0);
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0);
}
@LifecycleStart

View File

@ -39,7 +39,7 @@ import java.io.InputStream;
/**
*/
@Path("/mmx/worker/v1")
@Path("/druid/worker/v1")
public class WorkerResource
{
private static final Logger log = new Logger(WorkerResource.class);

View File

@ -10,7 +10,7 @@ import com.google.common.util.concurrent.Futures;
import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskStatus;
@ -23,14 +23,15 @@ import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.indexing.worker.WorkerCuratorCoordinator;
import com.metamx.druid.indexing.worker.WorkerTaskMonitor;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
@ -42,7 +43,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -81,6 +81,7 @@ public class RemoteTaskRunnerTest
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();

View File

@ -334,7 +334,7 @@ public class SimpleResourceManagementStrategyTest
Task testTask
)
{
super(new Worker("host", "ip", 3, "version"), null);
super(new Worker("host", "ip", 3, "version"), null, null);
this.testTask = testTask;
}