mirror of https://github.com/apache/druid.git
Enable Spotbugs: WMI_WRONG_MAP_ITERATOR (#8005)
* WMI_WRONG_MAP_ITERATOR * Fixed missing loop
This commit is contained in:
parent
fb653ceef9
commit
e016995d1f
|
@ -88,5 +88,4 @@
|
|||
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
|
||||
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
|
||||
<Bug pattern="URF_UNREAD_FIELD"/>
|
||||
<Bug pattern="WMI_WRONG_MAP_ITERATOR"/>
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -41,12 +41,12 @@ public class KeyedDiff
|
|||
}
|
||||
}
|
||||
|
||||
public static Map<String, Long> subtract(Map<String, Long> xs, Map<String, Long> ys)
|
||||
public static Map<String, Long> subtract(Map<String, Long> lhs, Map<String, Long> rhs)
|
||||
{
|
||||
assert xs.keySet().equals(ys.keySet());
|
||||
final Map<String, Long> zs = new HashMap<String, Long>();
|
||||
for (String k : xs.keySet()) {
|
||||
zs.put(k, xs.get(k) - ys.get(k));
|
||||
assert lhs.keySet().equals(rhs.keySet());
|
||||
final Map<String, Long> zs = new HashMap<>();
|
||||
for (Map.Entry<String, Long> k : lhs.entrySet()) {
|
||||
zs.put(k.getKey(), k.getValue() - rhs.get(k.getKey()));
|
||||
}
|
||||
return zs;
|
||||
}
|
||||
|
|
|
@ -371,16 +371,12 @@ public class MaterializedViewSupervisor implements Supervisor
|
|||
Map<Interval, String> toDropInterval = new HashMap<>(difference.entriesOnlyOnRight());
|
||||
// if some intervals are in running tasks and the versions are the same, remove it from toBuildInterval
|
||||
// if some intervals are in running tasks, but the versions are different, stop the task.
|
||||
for (Interval interval : runningVersion.keySet()) {
|
||||
if (toBuildInterval.containsKey(interval)
|
||||
&& toBuildInterval.get(interval).equals(runningVersion.get(interval))
|
||||
) {
|
||||
for (Map.Entry<Interval, String> version : runningVersion.entrySet()) {
|
||||
final Interval interval = version.getKey();
|
||||
final String host = version.getValue();
|
||||
if (toBuildInterval.containsKey(interval) && toBuildInterval.get(interval).equals(host)) {
|
||||
toBuildInterval.remove(interval);
|
||||
|
||||
} else if (
|
||||
toBuildInterval.containsKey(interval)
|
||||
&& !toBuildInterval.get(interval).equals(runningVersion.get(interval))
|
||||
) {
|
||||
} else if (toBuildInterval.containsKey(interval) && !toBuildInterval.get(interval).equals(host)) {
|
||||
if (taskMaster.getTaskQueue().isPresent()) {
|
||||
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), "version mismatch");
|
||||
runningTasks.remove(interval);
|
||||
|
|
|
@ -41,7 +41,7 @@ public class DataSourceOptimizerMonitor extends AbstractMonitor
|
|||
@Override
|
||||
public boolean doMonitor(ServiceEmitter emitter)
|
||||
{
|
||||
List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
|
||||
final List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
|
||||
for (DataSourceOptimizerStats stat : stats) {
|
||||
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
||||
builder.setDimension("dataSource", stat.getBase());
|
||||
|
@ -50,9 +50,9 @@ public class DataSourceOptimizerMonitor extends AbstractMonitor
|
|||
emitter.emit(builder.build("/materialized/view/query/hitRate", stat.getHitRate()));
|
||||
emitter.emit(builder.build("/materialized/view/select/avgCostMS", stat.getOptimizerCost()));
|
||||
Map<String, Long> derivativesStats = stat.getDerivativesHitCount();
|
||||
for (String derivative : derivativesStats.keySet()) {
|
||||
builder.setDimension("derivative", derivative);
|
||||
emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivativesStats.get(derivative)));
|
||||
for (Map.Entry<String, Long> derivative : derivativesStats.entrySet()) {
|
||||
builder.setDimension("derivative", derivative.getKey());
|
||||
emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivative.getValue()));
|
||||
}
|
||||
final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder();
|
||||
builder2.setDimension("dataSource", stat.getBase());
|
||||
|
|
|
@ -45,8 +45,6 @@ import com.google.common.util.concurrent.SettableFuture;
|
|||
import org.apache.commons.lang.mutable.MutableInt;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.apache.druid.concurrent.LifecycleLock;
|
||||
import org.apache.druid.curator.CuratorUtils;
|
||||
|
@ -226,97 +224,92 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
|
||||
// Add listener for creation/deletion of workers
|
||||
workerPathCache.getListenable().addListener(
|
||||
new PathChildrenCacheListener()
|
||||
{
|
||||
@Override
|
||||
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
|
||||
{
|
||||
final Worker worker;
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.increment();
|
||||
}
|
||||
Futures.addCallback(
|
||||
addWorker(worker),
|
||||
new FutureCallback<ZkWorker>()
|
||||
(client, event) -> {
|
||||
final Worker worker;
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.increment();
|
||||
}
|
||||
Futures.addCallback(
|
||||
addWorker(worker),
|
||||
new FutureCallback<ZkWorker>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(ZkWorker zkWorker)
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(ZkWorker zkWorker)
|
||||
{
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.decrement();
|
||||
waitingForMonitor.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable)
|
||||
{
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.decrement();
|
||||
waitingForMonitor.notifyAll();
|
||||
}
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.decrement();
|
||||
waitingForMonitor.notifyAll();
|
||||
}
|
||||
}
|
||||
);
|
||||
break;
|
||||
case CHILD_UPDATED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
updateWorker(worker);
|
||||
break;
|
||||
|
||||
case CHILD_REMOVED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
removeWorker(worker);
|
||||
break;
|
||||
case INITIALIZED:
|
||||
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
|
||||
List<String> workers;
|
||||
try {
|
||||
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
|
||||
}
|
||||
catch (KeeperException.NoNodeException e) {
|
||||
// statusPath doesn't exist yet; can occur if no middleManagers have started.
|
||||
workers = ImmutableList.of();
|
||||
}
|
||||
for (String workerId : workers) {
|
||||
final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
|
||||
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
|
||||
if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
|
||||
try {
|
||||
scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(
|
||||
e,
|
||||
"Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
|
||||
workerId,
|
||||
workerStatusPath
|
||||
);
|
||||
@Override
|
||||
public void onFailure(Throwable throwable)
|
||||
{
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.decrement();
|
||||
waitingForMonitor.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
break;
|
||||
case CHILD_UPDATED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
updateWorker(worker);
|
||||
break;
|
||||
|
||||
case CHILD_REMOVED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
removeWorker(worker);
|
||||
break;
|
||||
case INITIALIZED:
|
||||
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
|
||||
List<String> workers;
|
||||
try {
|
||||
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
|
||||
}
|
||||
catch (KeeperException.NoNodeException e) {
|
||||
// statusPath doesn't exist yet; can occur if no middleManagers have started.
|
||||
workers = ImmutableList.of();
|
||||
}
|
||||
for (String workerId : workers) {
|
||||
final String workerAnnouncePath = JOINER.join(indexerZkConfig.getAnnouncementsPath(), workerId);
|
||||
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId);
|
||||
if (!zkWorkers.containsKey(workerId) && cf.checkExists().forPath(workerAnnouncePath) == null) {
|
||||
try {
|
||||
scheduleTasksCleanupForWorker(workerId, cf.getChildren().forPath(workerStatusPath));
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(
|
||||
e,
|
||||
"Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.",
|
||||
workerId,
|
||||
workerStatusPath
|
||||
);
|
||||
}
|
||||
}
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.decrement();
|
||||
waitingForMonitor.notifyAll();
|
||||
}
|
||||
break;
|
||||
case CONNECTION_SUSPENDED:
|
||||
case CONNECTION_RECONNECTED:
|
||||
case CONNECTION_LOST:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
synchronized (waitingForMonitor) {
|
||||
waitingFor.decrement();
|
||||
waitingForMonitor.notifyAll();
|
||||
}
|
||||
break;
|
||||
case CONNECTION_SUSPENDED:
|
||||
case CONNECTION_RECONNECTED:
|
||||
case CONNECTION_LOST:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -331,7 +324,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
cleanupExec,
|
||||
Period.ZERO.toStandardDuration(),
|
||||
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
|
||||
() -> checkBlackListedNodes()
|
||||
this::checkBlackListedNodes
|
||||
);
|
||||
|
||||
provisioningService = provisioningStrategy.makeProvisioningService(this);
|
||||
|
@ -657,50 +650,45 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
private void runPendingTasks()
|
||||
{
|
||||
runPendingTasksExec.submit(
|
||||
new Callable<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void call()
|
||||
{
|
||||
try {
|
||||
// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
|
||||
// into running status
|
||||
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
|
||||
sortByInsertionTime(copy);
|
||||
(Callable<Void>) () -> {
|
||||
try {
|
||||
// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
|
||||
// into running status
|
||||
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
|
||||
sortByInsertionTime(copy);
|
||||
|
||||
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
|
||||
String taskId = taskRunnerWorkItem.getTaskId();
|
||||
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
|
||||
try {
|
||||
//this can still be null due to race from explicit task shutdown request
|
||||
//or if another thread steals and completes this task right after this thread makes copy
|
||||
//of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .
|
||||
Task task = pendingTaskPayloads.get(taskId);
|
||||
if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
|
||||
pendingTaskPayloads.remove(taskId);
|
||||
}
|
||||
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
|
||||
String taskId = taskRunnerWorkItem.getTaskId();
|
||||
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
|
||||
try {
|
||||
//this can still be null due to race from explicit task shutdown request
|
||||
//or if another thread steals and completes this task right after this thread makes copy
|
||||
//of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .
|
||||
Task task = pendingTaskPayloads.get(taskId);
|
||||
if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
|
||||
pendingTaskPayloads.remove(taskId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception while trying to assign task")
|
||||
.addData("taskId", taskRunnerWorkItem.getTaskId())
|
||||
.emit();
|
||||
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
|
||||
if (workItem != null) {
|
||||
taskComplete(workItem, null, TaskStatus.failure(taskId));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
tryAssignTasks.remove(taskId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception while trying to assign task")
|
||||
.addData("taskId", taskRunnerWorkItem.getTaskId())
|
||||
.emit();
|
||||
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
|
||||
if (workItem != null) {
|
||||
taskComplete(workItem, null, TaskStatus.failure(taskId));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
tryAssignTasks.remove(taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception in running pending tasks").emit();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception in running pending tasks").emit();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -782,16 +770,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
Maps.transformEntries(
|
||||
Maps.filterEntries(
|
||||
zkWorkers,
|
||||
new Predicate<Map.Entry<String, ZkWorker>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Map.Entry<String, ZkWorker> input)
|
||||
{
|
||||
return !lazyWorkers.containsKey(input.getKey()) &&
|
||||
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
|
||||
!blackListedWorkers.contains(input.getValue());
|
||||
}
|
||||
}
|
||||
input -> !lazyWorkers.containsKey(input.getKey()) &&
|
||||
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
|
||||
!blackListedWorkers.contains(input.getValue())
|
||||
),
|
||||
(String key, ZkWorker value) -> value.toImmutable()
|
||||
)
|
||||
|
@ -935,111 +916,106 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
|
||||
// Add status listener to the watcher for status changes
|
||||
zkWorker.addListener(
|
||||
new PathChildrenCacheListener()
|
||||
{
|
||||
@Override
|
||||
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
|
||||
{
|
||||
final String taskId;
|
||||
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
|
||||
synchronized (statusLock) {
|
||||
try {
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
case CHILD_UPDATED:
|
||||
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
||||
final TaskAnnouncement announcement = jsonMapper.readValue(
|
||||
event.getData().getData(), TaskAnnouncement.class
|
||||
);
|
||||
(client, event) -> {
|
||||
final String taskId;
|
||||
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
|
||||
synchronized (statusLock) {
|
||||
try {
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
case CHILD_UPDATED:
|
||||
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
||||
final TaskAnnouncement announcement = jsonMapper.readValue(
|
||||
event.getData().getData(), TaskAnnouncement.class
|
||||
);
|
||||
|
||||
log.info(
|
||||
"Worker[%s] wrote %s status for task [%s] on [%s]",
|
||||
zkWorker.getWorker().getHost(),
|
||||
announcement.getTaskStatus().getStatusCode(),
|
||||
log.info(
|
||||
"Worker[%s] wrote %s status for task [%s] on [%s]",
|
||||
zkWorker.getWorker().getHost(),
|
||||
announcement.getTaskStatus().getStatusCode(),
|
||||
taskId,
|
||||
announcement.getTaskLocation()
|
||||
);
|
||||
|
||||
// Synchronizing state with ZK
|
||||
statusLock.notifyAll();
|
||||
|
||||
final RemoteTaskRunnerWorkItem tmp;
|
||||
if ((tmp = runningTasks.get(taskId)) != null) {
|
||||
taskRunnerWorkItem = tmp;
|
||||
} else {
|
||||
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
|
||||
taskId,
|
||||
announcement.getTaskLocation()
|
||||
announcement.getTaskType(),
|
||||
zkWorker.getWorker(),
|
||||
TaskLocation.unknown(),
|
||||
announcement.getTaskDataSource()
|
||||
);
|
||||
|
||||
// Synchronizing state with ZK
|
||||
statusLock.notifyAll();
|
||||
|
||||
final RemoteTaskRunnerWorkItem tmp;
|
||||
if ((tmp = runningTasks.get(taskId)) != null) {
|
||||
taskRunnerWorkItem = tmp;
|
||||
} else {
|
||||
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
|
||||
taskId,
|
||||
announcement.getTaskType(),
|
||||
zkWorker.getWorker(),
|
||||
TaskLocation.unknown(),
|
||||
announcement.getTaskDataSource()
|
||||
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
|
||||
taskId,
|
||||
newTaskRunnerWorkItem
|
||||
);
|
||||
if (existingItem == null) {
|
||||
log.warn(
|
||||
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
|
||||
zkWorker.getWorker().getHost(),
|
||||
taskId
|
||||
);
|
||||
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
|
||||
taskId,
|
||||
newTaskRunnerWorkItem
|
||||
);
|
||||
if (existingItem == null) {
|
||||
log.warn(
|
||||
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
|
||||
zkWorker.getWorker().getHost(),
|
||||
taskId
|
||||
);
|
||||
taskRunnerWorkItem = newTaskRunnerWorkItem;
|
||||
} else {
|
||||
taskRunnerWorkItem = existingItem;
|
||||
}
|
||||
}
|
||||
|
||||
if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
|
||||
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
|
||||
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
|
||||
}
|
||||
|
||||
if (announcement.getTaskStatus().isComplete()) {
|
||||
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
|
||||
runPendingTasks();
|
||||
}
|
||||
break;
|
||||
case CHILD_REMOVED:
|
||||
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
||||
taskRunnerWorkItem = runningTasks.remove(taskId);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
log.info("Task[%s] just disappeared!", taskId);
|
||||
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
|
||||
taskRunnerWorkItem = newTaskRunnerWorkItem;
|
||||
} else {
|
||||
log.info("Task[%s] went bye bye.", taskId);
|
||||
}
|
||||
break;
|
||||
case INITIALIZED:
|
||||
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
|
||||
retVal.set(zkWorker);
|
||||
} else {
|
||||
final String message = StringUtils.format(
|
||||
"WTF?! Tried to add already-existing worker[%s]",
|
||||
worker.getHost()
|
||||
);
|
||||
log.makeAlert(message)
|
||||
.addData("workerHost", worker.getHost())
|
||||
.addData("workerIp", worker.getIp())
|
||||
.emit();
|
||||
retVal.setException(new IllegalStateException(message));
|
||||
taskRunnerWorkItem = existingItem;
|
||||
}
|
||||
}
|
||||
|
||||
if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
|
||||
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
|
||||
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
|
||||
}
|
||||
|
||||
if (announcement.getTaskStatus().isComplete()) {
|
||||
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
|
||||
runPendingTasks();
|
||||
break;
|
||||
case CONNECTION_SUSPENDED:
|
||||
case CONNECTION_RECONNECTED:
|
||||
case CONNECTION_LOST:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to handle new worker status")
|
||||
.addData("worker", zkWorker.getWorker().getHost())
|
||||
.addData("znode", event.getData().getPath())
|
||||
.emit();
|
||||
}
|
||||
break;
|
||||
case CHILD_REMOVED:
|
||||
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
||||
taskRunnerWorkItem = runningTasks.remove(taskId);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
log.info("Task[%s] just disappeared!", taskId);
|
||||
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
|
||||
} else {
|
||||
log.info("Task[%s] went bye bye.", taskId);
|
||||
}
|
||||
break;
|
||||
case INITIALIZED:
|
||||
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
|
||||
retVal.set(zkWorker);
|
||||
} else {
|
||||
final String message = StringUtils.format(
|
||||
"WTF?! Tried to add already-existing worker[%s]",
|
||||
worker.getHost()
|
||||
);
|
||||
log.makeAlert(message)
|
||||
.addData("workerHost", worker.getHost())
|
||||
.addData("workerIp", worker.getIp())
|
||||
.emit();
|
||||
retVal.setException(new IllegalStateException(message));
|
||||
}
|
||||
runPendingTasks();
|
||||
break;
|
||||
case CONNECTION_SUSPENDED:
|
||||
case CONNECTION_RECONNECTED:
|
||||
case CONNECTION_LOST:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to handle new worker status")
|
||||
.addData("worker", zkWorker.getWorker().getHost())
|
||||
.addData("znode", event.getData().getPath())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -1113,45 +1089,40 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
cancelWorkerCleanup(worker);
|
||||
|
||||
final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
log.info("Running scheduled cleanup for Worker[%s]", worker);
|
||||
try {
|
||||
for (String assignedTask : tasksToFail) {
|
||||
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
|
||||
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
|
||||
if (cf.checkExists().forPath(taskPath) != null) {
|
||||
cf.delete().guaranteed().forPath(taskPath);
|
||||
}
|
||||
|
||||
if (cf.checkExists().forPath(statusPath) != null) {
|
||||
cf.delete().guaranteed().forPath(statusPath);
|
||||
}
|
||||
|
||||
log.info("Failing task[%s]", assignedTask);
|
||||
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
|
||||
} else {
|
||||
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
|
||||
}
|
||||
() -> {
|
||||
log.info("Running scheduled cleanup for Worker[%s]", worker);
|
||||
try {
|
||||
for (String assignedTask : tasksToFail) {
|
||||
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
|
||||
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
|
||||
if (cf.checkExists().forPath(taskPath) != null) {
|
||||
cf.delete().guaranteed().forPath(taskPath);
|
||||
}
|
||||
|
||||
// worker is gone, remove worker task status announcements path.
|
||||
String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
|
||||
if (cf.checkExists().forPath(workerStatusPath) != null) {
|
||||
cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
|
||||
if (cf.checkExists().forPath(statusPath) != null) {
|
||||
cf.delete().guaranteed().forPath(statusPath);
|
||||
}
|
||||
|
||||
log.info("Failing task[%s]", assignedTask);
|
||||
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
|
||||
} else {
|
||||
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
|
||||
throw new RuntimeException(e);
|
||||
|
||||
// worker is gone, remove worker task status announcements path.
|
||||
String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
|
||||
if (cf.checkExists().forPath(workerStatusPath) != null) {
|
||||
cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
|
@ -1248,14 +1219,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
}
|
||||
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
|
||||
synchronized (statusLock) {
|
||||
Iterator<String> iterator = zkWorkers.keySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String worker = iterator.next();
|
||||
ZkWorker zkWorker = zkWorkers.get(worker);
|
||||
for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
|
||||
final ZkWorker zkWorker = worker.getValue();
|
||||
try {
|
||||
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
|
||||
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
|
||||
lazyWorkers.put(worker, zkWorker);
|
||||
lazyWorkers.put(worker.getKey(), zkWorker);
|
||||
if (lazyWorkers.size() == maxWorkers) {
|
||||
// only mark excess workers as lazy and allow their cleanup
|
||||
break;
|
||||
|
|
|
@ -236,7 +236,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
cleanupExec,
|
||||
Period.ZERO.toStandardDuration(),
|
||||
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
|
||||
() -> checkAndRemoveWorkersFromBlackList()
|
||||
this::checkAndRemoveWorkersFromBlackList
|
||||
);
|
||||
|
||||
provisioningService = provisioningStrategy.makeProvisioningService(this);
|
||||
|
@ -329,16 +329,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
Maps.transformEntries(
|
||||
Maps.filterEntries(
|
||||
workers,
|
||||
new Predicate<Map.Entry<String, WorkerHolder>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Map.Entry<String, WorkerHolder> input)
|
||||
{
|
||||
return !lazyWorkers.containsKey(input.getKey()) &&
|
||||
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
|
||||
!blackListedWorkers.containsKey(input.getKey());
|
||||
}
|
||||
}
|
||||
input -> !lazyWorkers.containsKey(input.getKey()) &&
|
||||
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
|
||||
!blackListedWorkers.containsKey(input.getKey())
|
||||
),
|
||||
(String key, WorkerHolder value) -> value.toImmutable()
|
||||
)
|
||||
|
@ -566,41 +559,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
cancelWorkerCleanup(workerHostAndPort);
|
||||
|
||||
final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort);
|
||||
try {
|
||||
Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
|
||||
synchronized (statusLock) {
|
||||
for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
|
||||
if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
|
||||
Worker w = e.getValue().getWorker();
|
||||
if (w != null && w.getHost().equals(workerHostAndPort)) {
|
||||
tasksToFail.add(e.getValue());
|
||||
}
|
||||
() -> {
|
||||
log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort);
|
||||
try {
|
||||
Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>();
|
||||
synchronized (statusLock) {
|
||||
for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
|
||||
if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
|
||||
Worker w = e.getValue().getWorker();
|
||||
if (w != null && w.getHost().equals(workerHostAndPort)) {
|
||||
tasksToFail.add(e.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
|
||||
if (!taskItem.getResult().isDone()) {
|
||||
log.info(
|
||||
"Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
|
||||
workerHostAndPort,
|
||||
taskItem.getTaskId(),
|
||||
config.getTaskCleanupTimeout()
|
||||
);
|
||||
taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
|
||||
}
|
||||
for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
|
||||
if (!taskItem.getResult().isDone()) {
|
||||
log.info(
|
||||
"Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
|
||||
workerHostAndPort,
|
||||
taskItem.getTaskId(),
|
||||
config.getTaskCleanupTimeout()
|
||||
);
|
||||
taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
|
||||
|
@ -779,14 +767,12 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
|
||||
{
|
||||
synchronized (statusLock) {
|
||||
Iterator<String> iterator = workers.keySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String worker = iterator.next();
|
||||
WorkerHolder workerHolder = workers.get(worker);
|
||||
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
|
||||
final WorkerHolder workerHolder = worker.getValue();
|
||||
try {
|
||||
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
|
||||
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
|
||||
lazyWorkers.put(worker, workerHolder);
|
||||
lazyWorkers.put(worker.getKey(), workerHolder);
|
||||
if (lazyWorkers.size() == maxWorkers) {
|
||||
// only mark excess workers as lazy and allow their cleanup
|
||||
break;
|
||||
|
@ -835,7 +821,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
return tasks.values()
|
||||
.stream()
|
||||
.filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
|
||||
.map(item -> item.getTask())
|
||||
.map(HttpRemoteTaskRunnerWorkItem::getTask)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,8 +115,8 @@ public class SupervisorManager
|
|||
|
||||
synchronized (lock) {
|
||||
Map<String, SupervisorSpec> supervisors = metadataSupervisorManager.getLatest();
|
||||
for (String id : supervisors.keySet()) {
|
||||
SupervisorSpec spec = supervisors.get(id);
|
||||
for (Map.Entry<String, SupervisorSpec> supervisor : supervisors.entrySet()) {
|
||||
final SupervisorSpec spec = supervisor.getValue();
|
||||
if (!(spec instanceof NoopSupervisorSpec)) {
|
||||
try {
|
||||
createAndStartSupervisorInternal(spec, false);
|
||||
|
|
|
@ -95,14 +95,14 @@ class LookupListeningResource extends ListenerResource
|
|||
public Object post(final Map<String, LookupExtractorFactory> lookups)
|
||||
{
|
||||
final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
|
||||
for (final String name : lookups.keySet()) {
|
||||
for (final Map.Entry<String, LookupExtractorFactory> lookup : lookups.entrySet()) {
|
||||
|
||||
final LookupExtractorFactoryContainer factoryContainer = new LookupExtractorFactoryContainer(
|
||||
null,
|
||||
lookups.get(name)
|
||||
lookup.getValue()
|
||||
);
|
||||
|
||||
manager.add(name, factoryContainer);
|
||||
manager.add(lookup.getKey(), factoryContainer);
|
||||
}
|
||||
return ImmutableMap.of("status", "accepted", LookupModule.FAILED_UPDATES_KEY, failedUpdates);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue