Revert "Fix synchronization issues of AbstractYarnScheduler#nodeUpdate and its implementations. (Naganarasimha G R via wangda)" because forgot to add JIRA Number
This reverts commit 554e6656bc
.
This commit is contained in:
parent
554e6656bc
commit
cb64e8eb19
|
@ -36,7 +36,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
|
@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -76,15 +76,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
|
@ -395,8 +394,8 @@ public abstract class AbstractYarnScheduler
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
|
public synchronized void recoverContainersOnNode(
|
||||||
RMNode nm) {
|
List<NMContainerStatus> containerReports, RMNode nm) {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
if (!rmContext.isWorkPreservingRecoveryEnabled()
|
if (!rmContext.isWorkPreservingRecoveryEnabled()
|
||||||
|
@ -991,7 +990,7 @@ public abstract class AbstractYarnScheduler
|
||||||
* Process a heartbeat update from a node.
|
* Process a heartbeat update from a node.
|
||||||
* @param nm The RMNode corresponding to the NodeManager
|
* @param nm The RMNode corresponding to the NodeManager
|
||||||
*/
|
*/
|
||||||
protected void nodeUpdate(RMNode nm) {
|
protected synchronized void nodeUpdate(RMNode nm) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("nodeUpdate: " + nm +
|
LOG.debug("nodeUpdate: " + nm +
|
||||||
" cluster capacity: " + getClusterResource());
|
" cluster capacity: " + getClusterResource());
|
||||||
|
|
|
@ -18,9 +18,24 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.IOException;
|
||||||
import com.google.common.base.Preconditions;
|
import java.io.InputStream;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -88,9 +103,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
||||||
|
@ -125,24 +141,9 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.InputStream;
|
import com.google.common.base.Preconditions;
|
||||||
import java.util.ArrayList;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
|
|
||||||
@LimitedPrivate("yarn")
|
@LimitedPrivate("yarn")
|
||||||
@Evolving
|
@Evolving
|
||||||
|
@ -1016,7 +1017,7 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeUpdate(RMNode rmNode) {
|
protected synchronized void nodeUpdate(RMNode rmNode) {
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
setLastNodeUpdateTime(Time.now());
|
setLastNodeUpdateTime(Time.now());
|
||||||
|
|
|
@ -18,15 +18,23 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.IOException;
|
||||||
import com.google.common.base.Preconditions;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -41,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -86,16 +95,8 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.ArrayList;
|
import com.google.common.base.Preconditions;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A scheduler that schedules resources between a set of queues. The scheduler
|
* A scheduler that schedules resources between a set of queues. The scheduler
|
||||||
|
@ -1069,7 +1070,7 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeUpdate(RMNode nm) {
|
protected synchronized void nodeUpdate(RMNode nm) {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
long start = getClock().getTime();
|
long start = getClock().getTime();
|
||||||
|
|
|
@ -18,7 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
@ -45,7 +54,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
@ -88,15 +96,7 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
|
|
||||||
@LimitedPrivate("yarn")
|
@LimitedPrivate("yarn")
|
||||||
@Evolving
|
@Evolving
|
||||||
|
@ -978,10 +978,4 @@ public class FifoScheduler extends
|
||||||
|
|
||||||
updateAvailableResourcesMetrics();
|
updateAvailableResourcesMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void recoverContainersOnNode(
|
|
||||||
List<NMContainerStatus> containerReports, RMNode nm) {
|
|
||||||
super.recoverContainersOnNode(containerReports, nm);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue