Merge pull request #2249 from metamx/workerExpanded

Use Worker instead of ZkWorker whenever possible
This commit is contained in:
Nishant 2016-02-24 13:23:22 +05:30
commit fb7eae34ed
22 changed files with 715 additions and 141 deletions

View File

@ -578,6 +578,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
return Optional.absent();
}
@Override
public void start()
{
// No state setup required
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
{

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@ -29,6 +30,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -42,7 +44,6 @@ import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.lifecycle.LifecycleStart;
@ -53,7 +54,6 @@ import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
@ -113,7 +113,7 @@ import java.util.concurrent.TimeUnit;
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
@ -153,7 +153,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement;
private final ResourceManagementStrategy<WorkerTaskRunner> resourceManagement;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement
ResourceManagementStrategy<WorkerTaskRunner> resourceManagement
)
{
this.jsonMapper = jsonMapper;
@ -180,6 +180,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.resourceManagement = resourceManagement;
}
@Override
@LifecycleStart
public void start()
{
@ -298,6 +299,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
}
@Override
@LifecycleStop
public void stop()
{
@ -325,7 +327,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return ImmutableList.of();
}
public Collection<ZkWorker> getWorkers()
@Override
public Collection<Worker> getWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}
public Collection<ZkWorker> getZkWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
}
@ -1018,7 +1026,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
taskRunnerWorkItem.setResult(taskStatus);
}
public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
@Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
@ -1027,7 +1036,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
@ -1040,13 +1049,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
throw Throwables.propagate(e);
}
}
return ImmutableList.copyOf(lazyWorkers.values());
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}
}
private List<String> getAssignedTasks(Worker worker) throws Exception
protected List<String> getAssignedTasks(Worker worker) throws Exception
{
List<String> assignedTasks = Lists.newArrayList(
final List<String> assignedTasks = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
);
@ -1066,9 +1075,25 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return assignedTasks;
}
public List<ZkWorker> getLazyWorkers()
@Override
public Collection<Worker> getLazyWorkers()
{
return ImmutableList.copyOf(lazyWorkers.values());
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}
public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
);
}
@VisibleForTesting

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{
public static final String TYPE_NAME = "remote";
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
@ -83,7 +84,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
@Override
public RemoteTaskRunner build()
{
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy;
final ResourceManagementStrategy<WorkerTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,

View File

@ -31,6 +31,7 @@ import java.util.List;
/**
* Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}.
* Holds state
*/
public interface TaskRunner
{
@ -75,4 +76,9 @@ public interface TaskRunner
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();
/**
* Start the state of the runner
*/
void start();
}

View File

@ -102,6 +102,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
);
}
@Override
@LifecycleStop
public void stop()
{
@ -259,6 +260,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return Optional.absent();
}
@Override
public void start()
{
// No state startup required
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Predicate;
import io.druid.indexing.worker.Worker;
import java.util.Collection;
public interface WorkerTaskRunner extends TaskRunner
{
/**
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();
/**
* Return a list of workers who can be reaped by autoscaling
* @return Workers which can be reaped by autoscaling
*/
Collection<Worker> getLazyWorkers();
/**
* Check which workers can be marked as lazy
* @param isLazyWorker
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
}

View File

@ -50,7 +50,6 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
@ -129,7 +128,7 @@ public class ZkWorker implements Closeable
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime.get();
return worker.get().getLastCompletedTaskTime();
}
public boolean isRunningTask(String taskId)
@ -139,7 +138,7 @@ public class ZkWorker implements Closeable
public boolean isValidVersion(String minVersion)
{
return worker.get().getVersion().compareTo(minVersion) >= 0;
return worker.get().isValidVersion(minVersion);
}
public void setWorker(Worker newWorker)
@ -153,7 +152,7 @@ public class ZkWorker implements Closeable
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.set(completedTaskTime);
worker.get().setLastCompletedTaskTime(completedTaskTime);
}
public ImmutableZkWorker toImmutable()
@ -172,7 +171,6 @@ public class ZkWorker implements Closeable
{
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -33,10 +34,10 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -48,7 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<RemoteTaskRunner>
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<WorkerTaskRunner>
{
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
@ -99,10 +100,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
this.exec = exec;
}
boolean doProvision(RemoteTaskRunner runner)
boolean doProvision(WorkerTaskRunner runner)
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ZkWorker> zkWorkers = getWorkers(runner);
Collection<Worker> workers = getWorkers(runner);
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@ -110,19 +111,19 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
log.warn("No workerConfig available, cannot provision new workers.");
return false;
}
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.<ZkWorker, String>transform(
zkWorkers,
new Function<ZkWorker, String>()
Iterables.transform(
workers,
new Function<Worker, String>()
{
@Override
public String apply(ZkWorker input)
public String apply(Worker input)
{
return input.getWorker().getIp();
return input.getIp();
}
}
)
@ -130,7 +131,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
);
currentlyProvisioning.removeAll(workerNodeIds);
updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers);
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
while (want > 0) {
@ -167,7 +168,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
boolean doTerminate(RemoteTaskRunner runner)
boolean doTerminate(WorkerTaskRunner runner)
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
synchronized (lock) {
@ -183,12 +184,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Lists.newArrayList(
Iterables.transform(
runner.getLazyWorkers(),
new Function<ZkWorker, String>()
new Function<Worker, String>()
{
@Override
public String apply(ZkWorker input)
public String apply(Worker input)
{
return input.getWorker().getIp();
return input.getIp();
}
}
)
@ -205,23 +206,23 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
Collection<ZkWorker> workers = getWorkers(runner);
Collection<Worker> workers = getWorkers(runner);
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps =
Lists.transform(
final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
final Collection<String> laziestWorkerIps =
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<ZkWorker, String>()
new Function<Worker, String>()
{
@Override
public String apply(ZkWorker zkWorker)
public String apply(Worker zkWorker)
{
return zkWorker.getWorker().getIp();
return zkWorker.getIp();
}
}
);
@ -235,7 +236,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps);
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(ImmutableList.copyOf(laziestWorkerIps));
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
@ -264,7 +265,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public void startManagement(final RemoteTaskRunner runner)
public void startManagement(final WorkerTaskRunner runner)
{
synchronized (lock) {
if (started) {
@ -333,16 +334,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats;
}
private static Predicate<ZkWorker> createLazyWorkerPredicate(
private static Predicate<Worker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config
)
{
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<ZkWorker>()
return new Predicate<Worker>()
{
@Override
public boolean apply(ZkWorker worker)
public boolean apply(Worker worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
@ -351,20 +352,20 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
};
}
private static Predicate<ZkWorker> createValidWorkerPredicate(
private static Predicate<Worker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config
)
{
return new Predicate<ZkWorker>()
return new Predicate<Worker>()
{
@Override
public boolean apply(ZkWorker zkWorker)
public boolean apply(Worker worker)
{
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
return zkWorker.isValidVersion(minVersion);
return worker.isValidVersion(minVersion);
}
};
}
@ -372,15 +373,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig,
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
final Collection<Worker> zkWorkers
)
{
synchronized (lock) {
final Collection<ZkWorker> validWorkers = Collections2.filter(
final Collection<Worker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config)
);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
@ -463,7 +464,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
public Collection<ZkWorker> getWorkers(RemoteTaskRunner runner)
public Collection<Worker> getWorkers(WorkerTaskRunner runner)
{
return runner.getWorkers();
}

View File

@ -72,4 +72,43 @@ public class RemoteTaskRunnerConfig
{
return taskShutdownLinkTimeout;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RemoteTaskRunnerConfig that = (RemoteTaskRunnerConfig) o;
if (getMaxZnodeBytes() != that.getMaxZnodeBytes()) {
return false;
}
if (!getTaskAssignmentTimeout().equals(that.getTaskAssignmentTimeout())) {
return false;
}
if (!getTaskCleanupTimeout().equals(that.getTaskCleanupTimeout())) {
return false;
}
if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) {
return false;
}
return getTaskShutdownLinkTimeout().equals(that.getTaskShutdownLinkTimeout());
}
@Override
public int hashCode()
{
int result = getTaskAssignmentTimeout().hashCode();
result = 31 * result + getTaskCleanupTimeout().hashCode();
result = 31 * result + getMinWorkerVersion().hashCode();
result = 31 * result + getMaxZnodeBytes();
result = 31 * result + getTaskShutdownLinkTimeout().hashCode();
return result;
}
}

View File

@ -21,6 +21,9 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicReference;
/**
* A container for worker metadata.
@ -31,19 +34,24 @@ public class Worker
private final String ip;
private final int capacity;
private final String version;
private final AtomicReference<DateTime> lastCompletedTaskTime;
@JsonCreator
public Worker(
@JsonProperty("host") String host,
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
)
{
this.host = host;
this.ip = ip;
this.capacity = capacity;
this.version = version;
this.lastCompletedTaskTime = new AtomicReference<>(lastCompletedTaskTime == null
? DateTime.now()
: lastCompletedTaskTime);
}
@JsonProperty
@ -70,6 +78,22 @@ public class Worker
return version;
}
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime.get();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.set(completedTaskTime);
}
public boolean isValidVersion(final String minVersion)
{
return getVersion().compareTo(minVersion) >= 0;
}
@Override
public String toString()
{

View File

@ -27,10 +27,11 @@ import com.google.common.collect.Lists;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.ForkingTaskRunner;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.tasklogs.TaskLogStreamer;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@ -52,20 +53,18 @@ public class WorkerResource
private static String DISABLED_VERSION = "";
private final Worker enabledWorker;
private final Worker disabledWorker;
private final WorkerCuratorCoordinator curatorCoordinator;
private final ForkingTaskRunner taskRunner;
private final TaskRunner taskRunner;
@Inject
public WorkerResource(
Worker worker,
WorkerCuratorCoordinator curatorCoordinator,
ForkingTaskRunner taskRunner
TaskRunner taskRunner
) throws Exception
{
this.enabledWorker = worker;
this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION);
this.curatorCoordinator = curatorCoordinator;
this.taskRunner = taskRunner;
}
@ -77,6 +76,13 @@ public class WorkerResource
public Response doDisable()
{
try {
final Worker disabledWorker = new Worker(
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION,
enabledWorker.getLastCompletedTaskTime()
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
}
@ -164,18 +170,26 @@ public class WorkerResource
@QueryParam("offset") @DefaultValue("0") long offset
)
{
final Optional<ByteSource> stream = taskRunner.streamTaskLog(taskid, offset);
if (!(taskRunner instanceof TaskLogStreamer)) {
return Response.status(501)
.entity(String.format(
"Log streaming not supported by [%s]",
taskRunner.getClass().getCanonicalName()
))
.build();
}
try {
final Optional<ByteSource> stream = ((TaskLogStreamer) taskRunner).streamTaskLog(taskid, offset);
if (stream.isPresent()) {
try {
return Response.ok(stream.get().openStream()).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
catch (IOException e) {
log.warn(e, "Failed to read log for task: %s", taskid);
return Response.serverError().build();
}
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
@ -358,7 +359,7 @@ public class RemoteTaskRunnerTest
doSetup();
final Set<String> existingTasks = Sets.newHashSet();
for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) {
for (ZkWorker zkWorker : remoteTaskRunner.getZkWorkers()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
}
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks);
@ -450,7 +451,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
// Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getVersion());
}
private void doSetup() throws Exception
@ -479,7 +480,7 @@ public class RemoteTaskRunnerTest
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<RemoteTaskRunner>()
new NoopResourceManagementStrategy<WorkerTaskRunner>()
);
remoteTaskRunner.start();
@ -491,7 +492,8 @@ public class RemoteTaskRunnerTest
"worker",
"localhost",
3,
"0"
"0",
DateTime.now()
);
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
@ -504,7 +506,7 @@ public class RemoteTaskRunnerTest
{
cf.setData().forPath(
announcementsPath,
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "", DateTime.now()))
);
}
@ -575,11 +577,11 @@ public class RemoteTaskRunnerTest
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>()
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<Worker>()
{
@Override
public boolean apply(ZkWorker input)
public boolean apply(Worker input)
{
return true;
}
@ -596,11 +598,11 @@ public class RemoteTaskRunnerTest
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>()
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<Worker>()
{
@Override
public boolean apply(ZkWorker input)
public boolean apply(Worker input)
{
return true;
}
@ -615,11 +617,11 @@ public class RemoteTaskRunnerTest
public void testFindLazyWorkerNotRunningAnyTask() throws Exception
{
doSetup();
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>()
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<Worker>()
{
@Override
public boolean apply(ZkWorker input)
public boolean apply(Worker input)
{
return true;
}

View File

@ -142,8 +142,8 @@ public class SimpleResourceManagementStrategyTest
)
);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
);
EasyMock.replay(runner);
@ -178,8 +178,8 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
).times(2);
EasyMock.replay(runner);
@ -235,8 +235,8 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
).times(2);
EasyMock.replay(runner);
@ -286,16 +286,16 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<Worker>singletonList(
new TestZkWorker(testTask).getWorker()
)
);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
@ -328,14 +328,14 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
);
EasyMock.replay(runner);
@ -377,14 +377,14 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
Arrays.asList(
new TestZkWorker(NoopTask.create()).getWorker(),
new TestZkWorker(NoopTask.create()).getWorker()
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<Worker>emptyList()
);
EasyMock.replay(runner);
@ -422,13 +422,13 @@ public class SimpleResourceManagementStrategyTest
Collections.<RemoteTaskRunnerWorkItem>emptyList()
).times(3);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
Collections.<Worker>singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0").getWorker()
)
).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<Worker>emptyList()
);
EasyMock.replay(runner);
@ -486,8 +486,8 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(null)
Collections.<Worker>singletonList(
new TestZkWorker(null).getWorker()
)
).times(1);
EasyMock.replay(runner);
@ -525,7 +525,7 @@ public class SimpleResourceManagementStrategyTest
String version
)
{
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
super(new Worker(host, ip, 3, version, DateTime.now()), null, new DefaultObjectMapper());
this.testTask = testTask;
}

View File

@ -0,0 +1,386 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class RemoteTaskRunnerConfigTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final Period DEFAULT_TIMEOUT = Period.ZERO;
private static final String DEFAULT_VERSION = "";
private static final long DEFAULT_MAX_ZNODE = 10 * 1024;
@Test
public void testGetTaskAssignmentTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getTaskAssignmentTimeout()
);
}
@Test
public void testGetTaskCleanupTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getTaskCleanupTimeout()
);
}
@Test
public void testGetMinWorkerVersion() throws Exception
{
final String version = "some version";
Assert.assertEquals(
version,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
version,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getMinWorkerVersion()
);
}
@Test
public void testGetMaxZnodeBytes() throws Exception
{
final long max = 20 * 1024;
Assert.assertEquals(
max,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
max,
DEFAULT_TIMEOUT
)).getMaxZnodeBytes()
);
}
@Test
public void testGetTaskShutdownLinkTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
timeout
)).getTaskShutdownLinkTimeout()
);
}
@Test
public void testEquals() throws Exception
{
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
))
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
version,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
version,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
DEFAULT_VERSION,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
DEFAULT_MAX_ZNODE,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
DEFAULT_TIMEOUT
))
);
}
@Test
public void testHashCode() throws Exception
{
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).hashCode()
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
version,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
version,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
DEFAULT_VERSION,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
DEFAULT_MAX_ZNODE,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
DEFAULT_TIMEOUT
)).hashCode()
);
}
private RemoteTaskRunnerConfig reflect(RemoteTaskRunnerConfig config) throws IOException
{
return mapper.readValue(mapper.writeValueAsString(config), RemoteTaskRunnerConfig.class);
}
private RemoteTaskRunnerConfig generateRemoteTaskRunnerConfig(
Period taskAssignmentTimeout,
Period taskCleanupTimeout,
String minWorkerVersion,
long maxZnodeBytes,
Period taskShutdownLinkTimeout
)
{
final Map<String, Object> objectMap = new HashMap<>();
objectMap.put("taskAssignmentTimeout", taskAssignmentTimeout);
objectMap.put("taskCleanupTimeout", taskCleanupTimeout);
objectMap.put("minWorkerVersion", minWorkerVersion);
objectMap.put("maxZnodeBytes", maxZnodeBytes);
objectMap.put("taskShutdownLinkTimeout", taskShutdownLinkTimeout);
return mapper.convertValue(objectMap, RemoteTaskRunnerConfig.class);
}
}

View File

@ -73,6 +73,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class OverlordResourceTest
{
@ -274,6 +275,7 @@ public class OverlordResourceTest
private CountDownLatch[] runLatches;
private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
private List<String> runningTasks;
private final AtomicBoolean started = new AtomicBoolean(false);
public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches)
{
@ -289,12 +291,6 @@ public class OverlordResourceTest
return ImmutableList.of();
}
@Override
public void stop()
{
// Do nothing
}
@Override
public synchronized ListenableFuture<TaskStatus> run(final Task task)
{
@ -366,5 +362,17 @@ public class OverlordResourceTest
{
return Optional.absent();
}
@Override
public void start()
{
started.set(true);
}
@Override
public void stop()
{
started.set(false);
}
}
}

View File

@ -26,6 +26,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@ -42,12 +43,12 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 1,
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 1,
Sets.<String>newHashSet()
)
),
@ -75,12 +76,12 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2,
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 2,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
Sets.<String>newHashSet()
)
),
@ -108,12 +109,12 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5,
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 5,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
Sets.<String>newHashSet()
)
),

View File

@ -26,6 +26,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@ -45,12 +46,12 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
)
),
@ -79,12 +80,12 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
)
),
@ -106,7 +107,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
)
),

View File

@ -54,6 +54,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
@ -112,7 +113,8 @@ public class WorkerTaskMonitorTest
"worker",
"localhost",
3,
"0"
"0",
DateTime.now()
);
workerCuratorCoordinator = new WorkerCuratorCoordinator(

View File

@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -73,7 +74,8 @@ public class WorkerResourceTest
"host",
"ip",
3,
"v1"
"v1",
DateTime.now()
);
curatorCoordinator = new WorkerCuratorCoordinator(

View File

@ -19,6 +19,7 @@
package io.druid.segment;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
@ -59,7 +60,7 @@ public class CloserRule implements TestRule
}
finally {
Throwable exception = null;
for (AutoCloseable autoCloseable : autoCloseables) {
for (AutoCloseable autoCloseable : Lists.reverse(autoCloseables)) {
try {
autoCloseable.close();
}

View File

@ -48,6 +48,7 @@ import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.DruidNode;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
import org.joda.time.DateTime;
import java.util.List;
@ -106,7 +107,8 @@ public class CliMiddleManager extends ServerRunnable
node.getHostAndPort(),
config.getIp(),
config.getCapacity(),
config.getVersion()
config.getVersion(),
DateTime.now()
);
}
},

View File

@ -60,6 +60,7 @@ import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
@ -197,7 +198,7 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME).to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
@ -206,7 +207,7 @@ public class CliOverlord extends ServerRunnable
private void configureAutoscale(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(ResourceManagementStrategy.class)
binder.bind(new TypeLiteral<ResourceManagementStrategy<WorkerTaskRunner>>(){})
.to(SimpleResourceManagementStrategy.class)
.in(LazySingleton.class);