YARN-9341. Fixed enentrant lock usage in YARN project.

Contributed by Prabhu Joseph
This commit is contained in:
Eric Yang 2019-03-07 16:47:45 -05:00
parent 1bc282e0b3
commit 39b4a37e02
52 changed files with 388 additions and 412 deletions

View File

@ -117,8 +117,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
@Override
public void handle(ServiceEvent event) {
try {
writeLock.lock();
try {
State oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);

View File

@ -1090,8 +1090,8 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public void handle(ComponentEvent event) {
try {
writeLock.lock();
try {
ComponentState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);

View File

@ -751,8 +751,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void handle(ComponentInstanceEvent event) {
try {
writeLock.lock();
try {
ComponentInstanceState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
@ -782,8 +782,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
void updateLocalizationStatuses(
List<org.apache.hadoop.yarn.api.records.LocalizationStatus> statuses) {
Map<String, String> resourcesCpy = new HashMap<>();
try {
readLock.lock();
try {
if (resolvedParams == null || resolvedParams.didLaunchFail() ||
resolvedParams.getResolvedRsrcPaths() == null ||
resolvedParams.getResolvedRsrcPaths().isEmpty()) {
@ -823,8 +823,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public void updateResolvedLaunchParams(
Future<ProviderService.ResolvedLaunchParams> future) {
try {
writeLock.lock();
try {
this.resolvedParams = future.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("{} updating resolved params", getCompInstanceId(), e);
@ -834,8 +834,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
public ContainerStatus getContainerStatus() {
try {
readLock.lock();
try {
return status;
} finally {
readLock.unlock();
@ -844,8 +844,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
private void setContainerStatus(ContainerId containerId,
ContainerStatus latestStatus) {
try {
writeLock.lock();
try {
this.status = latestStatus;
org.apache.hadoop.yarn.service.api.records.Container containerRec =
getCompSpec().getContainer(containerId.toString());

View File

@ -478,8 +478,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
@Override
public void flush() throws IOException {
try {
this.domainFDLocker.lock();
try {
if (domainLogFD != null) {
domainLogFD.flush();
}
@ -494,8 +494,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) {
try {
summaryTableCopyLocker.lock();
try {
return new HashMap<ApplicationAttemptId, EntityLogFD>(
summanyLogFDsToCopy);
} finally {
@ -506,8 +506,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
try {
entityTableCopyLocker.lock();
try {
return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>>(entityLogFDsToCopy);
} finally {
@ -521,8 +521,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
try {
logFD.flush();
} finally {
logFD.unlock();
@ -541,8 +541,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
try {
logFD.flush();
} finally {
logFD.unlock();
@ -567,8 +567,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private void cleanInActiveFDs() {
long currentTimeStamp = Time.monotonicNow();
try {
this.domainFDLocker.lock();
try {
if (domainLogFD != null) {
if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
domainLogFD.close();
@ -593,8 +593,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
try {
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
logFD.close();
}
@ -617,8 +617,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
try {
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
logFD.close();
}
@ -644,8 +644,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private class TimerMonitorTask extends TimerTask {
@Override
public void run() {
try {
timerTasksMonitorWriteLock.lock();
try {
monitorTimerTasks();
} finally {
timerTasksMonitorWriteLock.unlock();
@ -691,8 +691,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
monitorTaskTimer = null;
}
try {
this.domainFDLocker.lock();
try {
if (domainLogFD != null) {
domainLogFD.close();
domainLogFD = null;
@ -708,8 +708,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private void closeEntityFDs(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
try {
entityTableLocker.lock();
try {
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
@ -734,8 +734,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private void closeSummaryFDs(
Map<ApplicationAttemptId, EntityLogFD> logFDs) {
try {
summaryTableLocker.lock();
try {
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry
: logFDs.entrySet()) {
@ -757,8 +757,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
ObjectMapper objMapper, TimelineDomain domain,
boolean isAppendSupported) throws IOException {
checkAndStartTimeTasks();
try {
this.domainFDLocker.lock();
try {
if (this.domainLogFD != null) {
this.domainLogFD.writeDomain(domain);
} else {
@ -790,8 +790,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (logMapFD != null) {
EntityLogFD logFD = logMapFD.get(groupId);
if (logFD != null) {
try {
logFD.lock();
try {
if (serviceStopped) {
return;
}
@ -814,8 +814,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
try {
entityTableLocker.lock();
try {
if (serviceStopped) {
return;
}
@ -828,11 +828,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (logFD == null) {
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
}
try {
logFD.lock();
logFD.writeEntities(entities);
try {
logFD.writeEntities(entities);
entityTableCopyLocker.lock();
try {
logFDMap.put(groupId, logFD);
logFDs.put(attemptId, logFDMap);
} finally {
@ -862,8 +862,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
EntityLogFD logFD = null;
logFD = logFDs.get(attemptId);
if (logFD != null) {
try {
logFD.lock();
try {
if (serviceStopped) {
return;
}
@ -881,8 +881,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
ObjectMapper objMapper, ApplicationAttemptId attemptId,
List<TimelineEntity> entities, boolean isAppendSupported,
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
try {
summaryTableLocker.lock();
try {
if (serviceStopped) {
return;
}
@ -890,11 +890,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (logFD == null) {
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
}
try {
logFD.lock();
logFD.writeEntities(entities);
try {
logFD.writeEntities(entities);
summaryTableCopyLocker.lock();
try {
logFDs.put(attemptId, logFD);
} finally {
summaryTableCopyLocker.unlock();
@ -928,12 +928,12 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
private void checkAndStartTimeTasks() {
try {
this.timerTasksMonitorReadLock.lock();
try {
this.timeStampOfLastWrite = Time.monotonicNow();
if(!timerTaskStarted) {
try {
timerTaskLocker.lock();
try {
if (!timerTaskStarted) {
createAndStartTimerTasks();
timerTaskStarted = true;

View File

@ -766,8 +766,8 @@ public class CommonNodeLabelsManager extends AbstractService {
@SuppressWarnings("unchecked")
private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
try {
readLock.lock();
try {
Map<NodeId, Set<T>> nodeToLabels = new HashMap<>();
for (Entry<String, Host> entry : nodeCollections.entrySet()) {
String hostName = entry.getKey();
@ -809,8 +809,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return set of nodes with no labels
*/
public Set<NodeId> getNodesWithoutALabel() {
try {
readLock.lock();
try {
Set<NodeId> nodes = new HashSet<>();
for (Host host : nodeCollections.values()) {
for (NodeId nodeId : host.nms.keySet()) {
@ -832,8 +832,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes() {
try {
readLock.lock();
try {
return getLabelsToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
@ -848,8 +848,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) {
try {
readLock.lock();
try {
Map<String, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(labels,
String.class);
return Collections.unmodifiableMap(labelsToNodes);
@ -865,8 +865,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes() {
try {
readLock.lock();
try {
return getLabelsInfoToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
@ -882,8 +882,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes(Set<String> labels) {
try {
readLock.lock();
try {
Map<NodeLabel, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(
labels, NodeLabel.class);
return Collections.unmodifiableMap(labelsToNodes);
@ -922,8 +922,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return existing valid labels in repository
*/
public Set<String> getClusterNodeLabelNames() {
try {
readLock.lock();
try {
Set<String> labels = new HashSet<String>(labelCollections.keySet());
labels.remove(NO_LABEL);
return Collections.unmodifiableSet(labels);
@ -933,8 +933,8 @@ public class CommonNodeLabelsManager extends AbstractService {
}
public List<NodeLabel> getClusterNodeLabels() {
try {
readLock.lock();
try {
List<NodeLabel> nodeLabels = new ArrayList<>();
for (RMNodeLabel label : labelCollections.values()) {
if (!label.getLabelName().equals(NO_LABEL)) {
@ -952,8 +952,8 @@ public class CommonNodeLabelsManager extends AbstractService {
if (nodeLabel.equals(NO_LABEL)) {
return noNodeLabel.getIsExclusive();
}
try {
readLock.lock();
try {
RMNodeLabel label = labelCollections.get(nodeLabel);
if (label == null) {
String message =
@ -1048,8 +1048,8 @@ public class CommonNodeLabelsManager extends AbstractService {
}
public Set<NodeLabel> getLabelsInfoByNode(NodeId nodeId) {
try {
readLock.lock();
try {
Set<String> labels = getLabelsByNode(nodeId, nodeCollections);
if (labels.isEmpty()) {
return EMPTY_NODELABEL_SET;

View File

@ -93,10 +93,10 @@ public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore {
private void writeNewMirror() throws IOException {
ReentrantReadWriteLock.ReadLock readLock = manager.readLock;
try {
// Acquire readlock to make sure we get cluster node labels and
// node-to-labels mapping atomically.
readLock.lock();
try {
// Write mirror to mirror.new.tmp file
Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp");
try (FSDataOutputStream os = fs.create(newTmpPath, true)) {

View File

@ -57,8 +57,8 @@ public class ConfiguredYarnAuthorizer extends YarnAuthorizationProvider {
@Override
public void setPermission(List<Permission> permissions,
UserGroupInformation user) {
try {
writeLock.lock();
try {
for (Permission perm : permissions) {
allAcls.put(perm.getTarget(), perm.getAcls());
}
@ -94,8 +94,8 @@ public class ConfiguredYarnAuthorizer extends YarnAuthorizationProvider {
@Override
public boolean checkPermission(AccessRequest accessRequest) {
try {
readLock.lock();
try {
return checkPermissionInternal(accessRequest.getAccessType(),
accessRequest.getEntity(), accessRequest.getUser());
} finally {

View File

@ -986,8 +986,8 @@ public class LeveldbTimelineStore extends AbstractService
@Override
public TimelinePutResponse put(TimelineEntities entities) {
try {
deleteLock.readLock().lock();
try {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, false);
@ -1001,8 +1001,8 @@ public class LeveldbTimelineStore extends AbstractService
@Private
@VisibleForTesting
public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) {
try {
deleteLock.readLock().lock();
try {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, true);
@ -1525,8 +1525,8 @@ public class LeveldbTimelineStore extends AbstractService
LeveldbIterator iterator = null;
LeveldbIterator pfIterator = null;
long typeCount = 0;
try {
deleteLock.writeLock().lock();
try {
iterator = getDbIterator(false);
pfIterator = getDbIterator(false);

View File

@ -569,8 +569,8 @@ public abstract class ContainerExecutor implements Configurable {
* @return the path of the pid-file for the given containerId.
*/
protected Path getPidFilePath(ContainerId containerId) {
try {
readLock.lock();
try {
return (this.pidFiles.get(containerId));
} finally {
readLock.unlock();
@ -720,9 +720,8 @@ public abstract class ContainerExecutor implements Configurable {
* @return true if the container is active
*/
protected boolean isContainerActive(ContainerId containerId) {
try {
readLock.lock();
try {
return (this.pidFiles.containsKey(containerId));
} finally {
readLock.unlock();
@ -742,8 +741,8 @@ public abstract class ContainerExecutor implements Configurable {
* of the launched process
*/
public void activateContainer(ContainerId containerId, Path pidFilePath) {
try {
writeLock.lock();
try {
this.pidFiles.put(containerId, pidFilePath);
} finally {
writeLock.unlock();
@ -778,8 +777,8 @@ public abstract class ContainerExecutor implements Configurable {
* @param containerId the container ID
*/
public void deactivateContainer(ContainerId containerId) {
try {
writeLock.lock();
try {
this.pidFiles.remove(containerId);
} finally {
writeLock.unlock();

View File

@ -667,8 +667,8 @@ public class ApplicationImpl implements Application {
@VisibleForTesting
public LogAggregationContext getLogAggregationContext() {
try {
this.readLock.lock();
try {
return this.logAggregationContext;
} finally {
this.readLock.unlock();

View File

@ -953,8 +953,8 @@ public class ContainerImpl implements Container {
@Override
public void setIpAndHost(String[] ipAndHost) {
try {
this.writeLock.lock();
try {
this.ips = ipAndHost[0];
this.host = ipAndHost[1];
} finally {
@ -2107,9 +2107,8 @@ public class ContainerImpl implements Container {
@Override
public void handle(ContainerEvent event) {
try {
this.writeLock.lock();
try {
ContainerId containerID = event.getContainerID();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + containerID + " of type " + event.getType());

View File

@ -130,8 +130,8 @@ class CGroupsHandlerImpl implements CGroupsHandler {
@Override
public String getControllerPath(CGroupController controller) {
try {
rwLock.readLock().lock();
try {
return controllerPaths.get(controller);
} finally {
rwLock.readLock().unlock();
@ -169,8 +169,8 @@ class CGroupsHandlerImpl implements CGroupsHandler {
}
// we want to do a bulk update without the paths changing concurrently
try {
rwLock.writeLock().lock();
try {
controllerPaths = cPaths;
parsedMtab = newMtab;
} finally {
@ -293,10 +293,9 @@ class CGroupsHandlerImpl implements CGroupsHandler {
if (existingMountPath == null ||
!requestedMountPath.equals(existingMountPath)) {
try {
//lock out other readers/writers till we are done
rwLock.writeLock().lock();
try {
// If the controller was already mounted we have to mount it
// with the same options to clone the mount point otherwise
// the operation will fail

View File

@ -116,8 +116,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
.append(getState() == ResourceState.LOCALIZED
? getLocalPath() + "," + getSize()
: "pending").append(",[");
try {
this.readLock.lock();
try {
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
@ -187,9 +187,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
@Override
public void handle(ResourceEvent event) {
try {
this.writeLock.lock();
try {
Path resourcePath = event.getLocalResourceRequest().getPath();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + resourcePath + " of type " + event.getType());

View File

@ -1194,8 +1194,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
final Lock rLock = rwLock.readLock();
final Lock wLock = rwLock.writeLock();
try {
wLock.lock();
try {
Runnable runnable = new Runnable() {
@Override
public void run() {

View File

@ -96,8 +96,8 @@ public class FifoCandidatesSelector
.getResToObtainByPartitionForLeafQueue(preemptionContext,
queueName, clusterResource);
try {
leafQueue.getReadLock().lock();
try {
// go through all ignore-partition-exclusivity containers first to make
// sure such containers will be preemptionCandidates first
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =

View File

@ -178,8 +178,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
// 7. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption.
try {
leafQueue.getReadLock().lock();
try {
for (FiCaSchedulerApp app : apps) {
preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
curCandidates, clusterResource, totalPreemptedResourceAllowed,

View File

@ -566,10 +566,9 @@ public class ProportionalCapacityPreemptionPolicy
Resource partitionResource, String partitionToLookAt) {
TempQueuePerPartition ret;
ReadLock readLock = curQueue.getReadLock();
try {
// Acquire a read lock from Parent/LeafQueue.
readLock.lock();
try {
String queueName = curQueue.getQueueName();
QueueCapacities qc = curQueue.getQueueCapacities();
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);

View File

@ -158,9 +158,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
AttributeMappingOperationType op,
Map<NodeAttributeKey, RMNodeAttribute> newAttributesToBeAdded,
String attributePrefix) {
try {
writeLock.lock();
try {
// shows node->attributes Mapped as part of this operation.
StringBuilder logMsg = new StringBuilder(op.name());
logMsg.append(" attributes on nodes:");
@ -403,8 +402,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
public Map<NodeAttributeKey,
Map<String, AttributeValue>> getAttributesToNodes(
Set<NodeAttributeKey> attributes) {
try {
readLock.lock();
try {
boolean fetchAllAttributes = (attributes == null || attributes.isEmpty());
Map<NodeAttributeKey, Map<String, AttributeValue>> attributesToNodes =
new HashMap<>();
@ -423,8 +422,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public Resource getResourceByAttribute(NodeAttribute attribute) {
try {
readLock.lock();
try {
return clusterAttributes.containsKey(attribute.getAttributeKey())
? clusterAttributes.get(attribute.getAttributeKey()).getResource()
: Resource.newInstance(0, 0);
@ -436,8 +435,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
@Override
public Map<NodeAttribute, AttributeValue> getAttributesForNode(
String hostName) {
try {
readLock.lock();
try {
return nodeCollections.containsKey(hostName)
? nodeCollections.get(hostName).getAttributes()
: new HashMap<>();
@ -448,8 +447,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
@Override
public List<NodeToAttributes> getNodeToAttributes(Set<String> prefix) {
try {
readLock.lock();
try {
List<NodeToAttributes> nodeToAttributes = new ArrayList<>();
nodeCollections.forEach((k, v) -> {
List<NodeAttribute> attrs;
@ -476,8 +475,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
@Override
public Map<String, Set<NodeAttribute>> getNodesToAttributes(
Set<String> hostNames) {
try {
readLock.lock();
try {
boolean fetchAllNodes = (hostNames == null || hostNames.isEmpty());
Map<String, Set<NodeAttribute>> nodeToAttrs = new HashMap<>();
if (fetchAllNodes) {
@ -498,8 +497,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public void activateNode(NodeId nodeId, Resource resource) {
try {
writeLock.lock();
try {
String hostName = nodeId.getHost();
Host host = nodeCollections.get(hostName);
if (host == null) {
@ -516,8 +515,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public void deactivateNode(NodeId nodeId) {
try {
writeLock.lock();
try {
Host host = nodeCollections.get(nodeId.getHost());
for (NodeAttribute attribute : host.getAttributes().keySet()) {
clusterAttributes.get(attribute.getAttributeKey())

View File

@ -71,9 +71,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
throws IOException {
try {
writeLock.lock();
try {
// get nodesCollection before edition
Map<String, Host> before = cloneNodeMap(addedLabelsToNode.keySet());
@ -112,8 +111,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
throws IOException {
try {
writeLock.lock();
try {
if (!isInitNodeLabelStoreInProgress()) {
// We cannot remove node labels from collection when some queue(s) are
// using any of them.
@ -137,8 +136,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void addToCluserNodeLabels(Collection<NodeLabel> labels)
throws IOException {
try {
writeLock.lock();
try {
super.addToCluserNodeLabels(labels);
} finally {
writeLock.unlock();
@ -149,9 +148,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
public void
removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
throws IOException {
try {
writeLock.lock();
try {
// get nodesCollection before edition
Map<String, Host> before =
cloneNodeMap(removeLabelsFromNode.keySet());
@ -171,9 +169,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
throws IOException {
try {
writeLock.lock();
try {
Map<NodeId, Set<String>> effectiveModifiedLabelMappings =
getModifiedNodeLabelsMappings(replaceLabelsToNode);
@ -230,9 +227,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
* will update running nodes resource
*/
public void activateNode(NodeId nodeId, Resource resource) {
try {
writeLock.lock();
try {
// save if we have a node before
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
@ -273,9 +269,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
* Following methods are used for setting if a node unregistered to RM
*/
public void deactivateNode(NodeId nodeId) {
try {
writeLock.lock();
try {
// save if we have a node before
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
Node nm = getNMInNodeSet(nodeId);
@ -314,8 +309,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public void reinitializeQueueLabels(Map<String, Set<String>> queueToLabels) {
try {
writeLock.lock();
try {
// clear before set
this.queueCollections.clear();
@ -347,8 +342,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
public Resource getQueueResource(String queueName, Set<String> queueLabels,
Resource clusterResource) {
try {
readLock.lock();
try {
if (queueLabels.contains(ANY)) {
return clusterResource;
}
@ -369,8 +364,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
if (label == null) {
return 0;
}
try {
readLock.lock();
try {
RMNodeLabel labelInfo = labelCollections.get(label);
return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs();
} finally {
@ -379,8 +374,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public Set<String> getLabelsOnNode(NodeId nodeId) {
try {
readLock.lock();
try {
Set<String> nodeLabels = getLabelsByNode(nodeId);
return Collections.unmodifiableSet(nodeLabels);
} finally {
@ -389,8 +384,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public boolean containsNodeLabel(String label) {
try {
readLock.lock();
try {
return label != null
&& (label.isEmpty() || labelCollections.containsKey(label));
} finally {
@ -522,8 +517,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
if (label.equals(NO_LABEL)) {
return noNodeLabel.getResource();
}
try {
readLock.lock();
try {
RMNodeLabel nodeLabel = labelCollections.get(label);
if (nodeLabel == null) {
return Resources.none();
@ -572,8 +567,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public List<RMNodeLabel> pullRMNodeLabelsInfo() {
try {
readLock.lock();
try {
List<RMNodeLabel> infos = new ArrayList<RMNodeLabel>();
for (Entry<String, RMNodeLabel> entry : labelCollections.entrySet()) {

View File

@ -45,8 +45,8 @@ public class PlacementManager {
}
public void updateRules(List<PlacementRule> rules) {
try {
writeLock.lock();
try {
this.rules = rules;
} finally {
writeLock.unlock();
@ -55,10 +55,8 @@ public class PlacementManager {
public ApplicationPlacementContext placeApplication(
ApplicationSubmissionContext asc, String user) throws YarnException {
try {
readLock.lock();
try {
if (null == rules || rules.isEmpty()) {
return null;
}

View File

@ -1776,8 +1776,8 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
try {
this.readLock.lock();
try {
if (!isLogAggregationFinished() && isAppInFinalState(this) &&
systemClock.getTime() > this.logAggregationStartTime
+ this.logAggregationStatusTimeout) {
@ -1801,8 +1801,8 @@ public class RMAppImpl implements RMApp, Recoverable {
}
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
try {
this.writeLock.lock();
try {
if (this.logAggregationEnabled && !isLogAggregationFinished()) {
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
boolean stateChangedToFinal = false;
@ -1851,8 +1851,8 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public LogAggregationStatus getLogAggregationStatusForAppReport() {
try {
this.readLock.lock();
try {
if (! logAggregationEnabled) {
return LogAggregationStatus.DISABLED;
}
@ -2022,8 +2022,8 @@ public class RMAppImpl implements RMApp, Recoverable {
}
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
try {
this.readLock.lock();
try {
List<String> failureMessages =
this.logAggregationFailureMessagesForNMs.get(nodeId);
if (failureMessages == null || failureMessages.isEmpty()) {

View File

@ -1587,8 +1587,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
&& this.getFinishTime() < (end - attemptFailuresValidityInterval)) {
return false;
}
try {
this.readLock.lock();
try {
int exitStatus = getAMContainerExitStatus();
return !(exitStatus == ContainerExitStatus.PREEMPTED
|| exitStatus == ContainerExitStatus.ABORTED
@ -2274,8 +2274,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public long getFinishTime() {
try {
this.readLock.lock();
try {
return this.finishTime;
} finally {
this.readLock.unlock();
@ -2283,8 +2283,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
private void setFinishTime(long finishTime) {
try {
this.writeLock.lock();
try {
this.finishTime = finishTime;
} finally {
this.writeLock.unlock();

View File

@ -73,8 +73,8 @@ public class RMAppAttemptMetrics {
}
public void updatePreemptionInfo(Resource resource, RMContainer container) {
try {
writeLock.lock();
try {
resourcePreempted = Resources.addTo(resourcePreempted, resource);
} finally {
writeLock.unlock();
@ -97,8 +97,8 @@ public class RMAppAttemptMetrics {
}
public Resource getResourcePreempted() {
try {
readLock.lock();
try {
return Resource.newInstance(resourcePreempted);
} finally {
readLock.unlock();

View File

@ -307,8 +307,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public Resource getAllocatedResource() {
try {
readLock.lock();
try {
return container.getResource();
} finally {
readLock.unlock();
@ -317,8 +317,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public Resource getLastConfirmedResource() {
try {
readLock.lock();
try {
return this.lastConfirmedResource;
} finally {
readLock.unlock();
@ -347,8 +347,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public long getFinishTime() {
try {
readLock.lock();
try {
return finishTime;
} finally {
readLock.unlock();
@ -357,8 +357,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public String getDiagnosticsInfo() {
try {
readLock.lock();
try {
if (finishedStatus != null) {
return finishedStatus.getDiagnostics();
} else {
@ -371,8 +371,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public String getLogURL() {
try {
readLock.lock();
try {
StringBuilder logURL = new StringBuilder();
logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext
.getYarnConfiguration()));
@ -387,8 +387,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public int getContainerExitStatus() {
try {
readLock.lock();
try {
if (finishedStatus != null) {
return finishedStatus.getExitStatus();
} else {
@ -401,8 +401,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public ContainerState getContainerState() {
try {
readLock.lock();
try {
if (finishedStatus != null) {
return finishedStatus.getState();
} else {
@ -415,8 +415,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public ContainerRequest getContainerRequest() {
try {
readLock.lock();
try {
return containerRequestForRecovery;
} finally {
readLock.unlock();
@ -439,8 +439,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public boolean isAMContainer() {
try {
readLock.lock();
try {
return isAMContainer;
} finally {
readLock.unlock();
@ -448,8 +448,8 @@ public class RMContainerImpl implements RMContainer {
}
public void setAMContainer(boolean isAMContainer) {
try {
writeLock.lock();
try {
this.isAMContainer = isAMContainer;
} finally {
writeLock.unlock();
@ -471,8 +471,9 @@ public class RMContainerImpl implements RMContainer {
LOG.debug("Processing " + event.getContainerId() + " of type " + event
.getType());
}
try {
writeLock.lock();
try {
RMContainerState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
@ -810,8 +811,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public String getNodeHttpAddress() {
try {
readLock.lock();
try {
if (container.getNodeHttpAddress() != null) {
StringBuilder httpAddress = new StringBuilder();
httpAddress.append(WebAppUtils.getHttpSchemePrefix(rmContext
@ -894,8 +895,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public Resource getAllocatedOrReservedResource() {
try {
readLock.lock();
try {
if (getState().equals(RMContainerState.RESERVED)) {
return getReservedResource();
} else {

View File

@ -670,8 +670,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try {
writeLock.lock();
try {
NodeState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
@ -1515,9 +1515,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public List<Container> pullNewlyIncreasedContainers() {
try {
writeLock.lock();
try {
if (nmReportedIncreasedContainers.isEmpty()) {
return Collections.emptyList();
} else {

View File

@ -118,8 +118,8 @@ public class AbstractResourceUsage {
return normalize(noLabelUsages.resArr.get(type.idx));
}
try {
readLock.lock();
try {
UsageByLabel usage = usages.get(label);
if (null == usage) {
return Resources.none();
@ -131,8 +131,8 @@ public class AbstractResourceUsage {
}
protected Resource _getAll(ResourceType type) {
try {
readLock.lock();
try {
Resource allOfType = Resources.createResource(0);
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
// all usages types are initialized
@ -159,8 +159,8 @@ public class AbstractResourceUsage {
}
protected void _set(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
try {
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr.set(type.idx, res);
} finally {
@ -169,8 +169,8 @@ public class AbstractResourceUsage {
}
protected void _inc(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
try {
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr.set(type.idx,
Resources.add(usage.resArr.get(type.idx), res));
@ -180,8 +180,8 @@ public class AbstractResourceUsage {
}
protected void _dec(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
try {
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr.set(type.idx,
Resources.subtract(usage.resArr.get(type.idx), res));
@ -192,8 +192,8 @@ public class AbstractResourceUsage {
@Override
public String toString() {
try {
readLock.lock();
try {
return usages.toString();
} finally {
readLock.unlock();
@ -201,8 +201,8 @@ public class AbstractResourceUsage {
}
public Set<String> getNodePartitionsSet() {
try {
readLock.lock();
try {
return usages.keySet();
} finally {
readLock.unlock();

View File

@ -343,8 +343,8 @@ public abstract class AbstractYarnScheduler
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
try {
readLock.lock();
try {
// Get the application for the finished container
SchedulerApplicationAttempt application =
getCurrentAttemptForContainer(containerId);
@ -485,8 +485,8 @@ public abstract class AbstractYarnScheduler
public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
RMNode nm) {
try {
writeLock.lock();
try {
if (!rmContext.isWorkPreservingRecoveryEnabled()
|| containerReports == null || (containerReports != null
&& containerReports.isEmpty())) {
@ -767,8 +767,8 @@ public abstract class AbstractYarnScheduler
@Override
public void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
try {
writeLock.lock();
try {
// check if destination queue is a valid leaf queue
try {
getQueueInfo(destQueue, false, false);
@ -798,8 +798,8 @@ public abstract class AbstractYarnScheduler
@Override
public void killAllAppsInQueue(String queueName)
throws YarnException {
try {
writeLock.lock();
try {
// check if queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
@ -824,8 +824,8 @@ public abstract class AbstractYarnScheduler
*/
public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
try {
writeLock.lock();
try {
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();

View File

@ -135,8 +135,8 @@ public class AppSchedulingInfo {
}
public String getQueueName() {
try {
this.readLock.lock();
try {
return queue.getQueueName();
} finally {
this.readLock.unlock();
@ -465,8 +465,8 @@ public class AppSchedulingInfo {
*/
public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<>();
try {
this.readLock.lock();
try {
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
ret.addAll(ap.getResourceRequests().values());
@ -483,8 +483,8 @@ public class AppSchedulingInfo {
*/
public List<SchedulingRequest> getAllSchedulingRequests() {
List<SchedulingRequest> ret = new ArrayList<>();
try {
this.readLock.lock();
try {
schedulerKeyToAppPlacementAllocator.values().stream()
.filter(ap -> ap.getSchedulingRequest() != null)
.forEach(ap -> ret.add(ap.getSchedulingRequest()));
@ -495,8 +495,8 @@ public class AppSchedulingInfo {
}
public PendingAsk getNextPendingAsk() {
try {
readLock.lock();
try {
SchedulerRequestKey firstRequestKey = schedulerKeys.first();
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
} finally {
@ -511,8 +511,8 @@ public class AppSchedulingInfo {
public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey,
String resourceName) {
try {
this.readLock.lock();
try {
AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
schedulerKey);
return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName);
@ -547,9 +547,8 @@ public class AppSchedulingInfo {
public ContainerRequest allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
try {
writeLock.lock();
try {
if (null != containerAllocated) {
updateMetricsForAllocatedContainer(type, node, containerAllocated);
}
@ -568,8 +567,8 @@ public class AppSchedulingInfo {
}
public void move(Queue newQueue) {
try {
this.writeLock.lock();
try {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
@ -607,8 +606,8 @@ public class AppSchedulingInfo {
public void stop() {
// clear pending resources metrics for the application
try {
this.writeLock.lock();
try {
QueueMetrics metrics = queue.getMetrics();
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
@ -634,8 +633,8 @@ public class AppSchedulingInfo {
}
public void setQueue(Queue queue) {
try {
this.writeLock.lock();
try {
this.queue = queue;
} finally {
this.writeLock.unlock();
@ -663,8 +662,8 @@ public class AppSchedulingInfo {
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
try {
this.writeLock.lock();
try {
QueueMetrics metrics = queue.getMetrics();
if (pending) {
// If there was any container to recover, the application was
@ -691,8 +690,8 @@ public class AppSchedulingInfo {
*/
public boolean checkAllocation(NodeType type, SchedulerNode node,
SchedulerRequestKey schedulerKey) {
try {
readLock.lock();
try {
AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
schedulerKey);
if (null == ap) {
@ -752,8 +751,8 @@ public class AppSchedulingInfo {
*/
public boolean canDelayTo(
SchedulerRequestKey schedulerKey, String resourceName) {
try {
this.readLock.lock();
try {
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap == null) || ap.canDelayTo(resourceName);
@ -773,8 +772,8 @@ public class AppSchedulingInfo {
*/
public boolean precheckNode(SchedulerRequestKey schedulerKey,
SchedulerNode schedulerNode, SchedulingMode schedulingMode) {
try {
this.readLock.lock();
try {
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap != null) && ap.precheckNode(schedulerNode,

View File

@ -73,8 +73,8 @@ public class ResourceUsage extends AbstractResourceUsage {
}
public void copyAllUsed(AbstractResourceUsage other) {
try {
writeLock.lock();
try {
for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
}
@ -285,8 +285,8 @@ public class ResourceUsage extends AbstractResourceUsage {
}
public Resource getCachedDemand(String label) {
try {
readLock.lock();
try {
Resource demand = Resources.createResource(0);
Resources.addTo(demand, getCachedUsed(label));
Resources.addTo(demand, getCachedPending(label));

View File

@ -253,8 +253,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* @return live containers of the application
*/
public Collection<RMContainer> getLiveContainers() {
try {
readLock.lock();
try {
return new ArrayList<>(liveContainers.values());
} finally {
readLock.unlock();
@ -307,8 +307,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public PendingAsk getPendingAsk(
SchedulerRequestKey schedulerKey, String resourceName) {
try {
readLock.lock();
try {
return appSchedulingInfo.getPendingAsk(schedulerKey, resourceName);
} finally {
readLock.unlock();
@ -321,8 +321,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey,
String resourceName) {
try {
readLock.lock();
try {
AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator(
schedulerKey);
return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName);
@ -369,8 +369,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void addRMContainer(
ContainerId id, RMContainer rmContainer) {
try {
writeLock.lock();
try {
if (!getApplicationAttemptId().equals(
rmContainer.getApplicationAttemptId()) &&
!liveContainers.containsKey(id)) {
@ -393,8 +393,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void removeRMContainer(ContainerId containerId) {
try {
writeLock.lock();
try {
RMContainer rmContainer = liveContainers.remove(containerId);
if (rmContainer != null) {
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
@ -446,8 +446,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public boolean updateResourceRequests(
List<ResourceRequest> requests) {
try {
writeLock.lock();
try {
if (!isStopped) {
return appSchedulingInfo.updateResourceRequests(requests, false);
}
@ -463,8 +463,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return false;
}
try {
writeLock.lock();
try {
if (!isStopped) {
return appSchedulingInfo.updateSchedulingRequests(requests, false);
}
@ -476,8 +476,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void recoverResourceRequestsForContainer(
ContainerRequest containerRequest) {
try {
writeLock.lock();
try {
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(
containerRequest.getResourceRequests(), true);
@ -488,8 +488,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void stop(RMAppAttemptState rmAppAttemptFinalState) {
try {
writeLock.lock();
try {
// Cleanup all scheduling information
isStopped = true;
appSchedulingInfo.stop();
@ -508,8 +508,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
*/
public List<RMContainer> getReservedContainers() {
List<RMContainer> list = new ArrayList<>();
try {
readLock.lock();
try {
for (Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
list.addAll(e.getValue().values());
@ -524,8 +524,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public boolean reserveIncreasedContainer(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Resource reservedResource) {
try {
writeLock.lock();
try {
if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) {
attemptResourceUsage.incReserved(node.getPartition(), reservedResource);
// succeeded
@ -573,8 +573,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public RMContainer reserve(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Container container) {
try {
writeLock.lock();
try {
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer = new RMContainerImpl(container, schedulerKey,
@ -617,8 +617,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public int getNumReservedContainers(
SchedulerRequestKey schedulerKey) {
try {
readLock.lock();
try {
Map<NodeId, RMContainer> map = this.reservedContainers.get(
schedulerKey);
return (map == null) ? 0 : map.size();
@ -630,8 +630,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
@SuppressWarnings("unchecked")
public void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
try {
writeLock.lock();
try {
// Inform the container
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
@ -650,8 +650,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void showRequests() {
if (LOG.isDebugEnabled()) {
try {
readLock.lock();
try {
for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey);
if (ap != null &&
@ -762,8 +762,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* </code>.
*/
List<RMContainer> pullContainersToTransfer() {
try {
writeLock.lock();
try {
recoveredPreviousAttemptContainers.clear();
return new ArrayList<>(liveContainers.values());
} finally {
@ -777,8 +777,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* <code>AllocateResponse#containersFromPreviousAttempts</code>.
*/
public List<Container> pullPreviousAttemptContainers() {
try {
writeLock.lock();
try {
if (recoveredPreviousAttemptContainers.isEmpty()) {
return null;
}
@ -796,8 +796,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
public List<Container> pullNewlyAllocatedContainers() {
try {
writeLock.lock();
try {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
@ -912,8 +912,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|| ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
return updatedContainers;
}
try {
writeLock.lock();
try {
Iterator<Map.Entry<ContainerId, RMContainer>> i =
newlyUpdatedContainers.entrySet().iterator();
while (i.hasNext()) {
@ -960,8 +961,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public List<NMToken> pullUpdatedNMTokens() {
try {
writeLock.lock();
try {
List <NMToken> returnList = new ArrayList<>(updatedNMTokens);
updatedNMTokens.clear();
return returnList;
@ -979,8 +980,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals) {
try {
writeLock.lock();
try {
if (!isStopped) {
if (isWaitingForAMContainer()) {
// The request is for the AM-container, and the AM-container is
@ -999,8 +1000,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public boolean isPlaceBlacklisted(String resourceName) {
try {
readLock.lock();
try {
boolean forAMContainer = isWaitingForAMContainer();
return this.appSchedulingInfo.isPlaceBlacklisted(resourceName,
forAMContainer);
@ -1103,8 +1104,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public ApplicationResourceUsageReport getResourceUsageReport() {
try {
writeLock.lock();
try {
AggregateAppResourceUsage runningResourceUsage =
getRunningAggregateAppResourceUsage();
Resource usedResourceClone = Resources.clone(
@ -1154,8 +1155,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
try {
writeLock.lock();
try {
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
@ -1172,8 +1173,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void move(Queue newQueue) {
try {
writeLock.lock();
try {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
String newQueueName = newQueue.getQueueName();
@ -1209,8 +1210,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
try {
writeLock.lock();
try {
// recover app scheduling info
appSchedulingInfo.recoverContainer(rmContainer, node.getPartition());

View File

@ -77,8 +77,8 @@ public class AbstractAutoCreatedLeafQueue extends LeafQueue {
*/
public void setEntitlement(String nodeLabel, QueueEntitlement entitlement)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
try {
float capacity = entitlement.getCapacity();
if (capacity < 0 || capacity > 1.0f) {
throw new SchedulerDynamicEditException(

View File

@ -279,8 +279,8 @@ public abstract class AbstractCSQueue implements CSQueue {
* @param maximumCapacity new max capacity
*/
void setMaxCapacity(float maximumCapacity) {
try {
writeLock.lock();
try {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(), maximumCapacity);
@ -301,8 +301,8 @@ public abstract class AbstractCSQueue implements CSQueue {
* @param maximumCapacity new max capacity
*/
void setMaxCapacity(String nodeLabel, float maximumCapacity) {
try {
writeLock.lock();
try {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(nodeLabel), maximumCapacity);
@ -333,8 +333,8 @@ public abstract class AbstractCSQueue implements CSQueue {
CapacitySchedulerConfiguration configuration) throws
IOException {
try {
writeLock.lock();
try {
// get labels
this.accessibleLabels =
configuration.getAccessibleNodeLabels(getQueuePath());
@ -750,8 +750,8 @@ public abstract class AbstractCSQueue implements CSQueue {
void allocateResource(Resource clusterResource,
Resource resource, String nodePartition) {
try {
writeLock.lock();
try {
queueUsage.incUsed(nodePartition, resource);
++numContainers;
@ -765,8 +765,8 @@ public abstract class AbstractCSQueue implements CSQueue {
protected void releaseResource(Resource clusterResource,
Resource resource, String nodePartition) {
try {
writeLock.lock();
try {
queueUsage.decUsed(nodePartition, resource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@ -785,8 +785,8 @@ public abstract class AbstractCSQueue implements CSQueue {
@Private
public Map<AccessType, AccessControlList> getACLs() {
try {
readLock.lock();
try {
return acls;
} finally {
readLock.unlock();
@ -938,8 +938,8 @@ public abstract class AbstractCSQueue implements CSQueue {
boolean canAssignToThisQueue(Resource clusterResource,
String nodePartition, ResourceLimits currentResourceLimits,
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
try {
readLock.lock();
try {
// Get current limited resource:
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
// queues' max capacity.
@ -1203,9 +1203,8 @@ public abstract class AbstractCSQueue implements CSQueue {
Resource netAllocated = Resources.subtract(required,
request.getTotalReleasedResource());
try {
readLock.lock();
try {
String partition = schedulerContainer.getNodePartition();
Resource maxResourceLimit;
if (allocation.getSchedulingMode()
@ -1254,8 +1253,8 @@ public abstract class AbstractCSQueue implements CSQueue {
@Override
public void activeQueue() throws YarnException {
try {
this.writeLock.lock();
try {
if (getState() == QueueState.RUNNING) {
LOG.info("The specified queue:" + queueName
+ " is already in the RUNNING state.");
@ -1278,8 +1277,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
protected void appFinished() {
try {
this.writeLock.lock();
try {
if (getState() == QueueState.DRAINING) {
if (getNumApplications() == 0) {
updateQueueState(QueueState.STOPPED);
@ -1301,8 +1300,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
public void recoverDrainingState() {
try {
this.writeLock.lock();
try {
if (getState() == QueueState.STOPPED) {
updateQueueState(QueueState.DRAINING);
}

View File

@ -54,9 +54,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
try {
writeLock.lock();
try {
// Set new configs
setupQueueConfigs(clusterResource);
@ -72,8 +71,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
*/
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException, IOException {
try {
writeLock.lock();
try {
if (childQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + childQueue + " being added has non zero capacity.");
@ -95,8 +94,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
*/
public void removeChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
try {
if (childQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + childQueue + " being removed has non zero capacity.");
@ -124,8 +123,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
public CSQueue removeChildQueue(String childQueueName)
throws SchedulerDynamicEditException {
CSQueue childQueue;
try {
writeLock.lock();
try {
childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue(
childQueueName);
if (childQueue != null) {
@ -141,8 +140,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
}
protected float sumOfChildCapacities() {
try {
writeLock.lock();
try {
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
@ -154,8 +153,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
}
protected float sumOfChildAbsCapacities() {
try {
writeLock.lock();
try {
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getAbsoluteCapacity();

View File

@ -49,9 +49,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
try {
writeLock.lock();
try {
validate(newlyParsedQueue);
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent;
@ -72,8 +71,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig
leafQueueTemplate) throws SchedulerDynamicEditException, IOException {
try {
writeLock.lock();
try {
// TODO:
// reinitialize only capacities for now since 0 capacity updates

View File

@ -322,8 +322,8 @@ public class CapacityScheduler extends
@VisibleForTesting
void initScheduler(Configuration configuration) throws
IOException {
try {
writeLock.lock();
try {
String confProviderStr = configuration.get(
YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
@ -421,8 +421,8 @@ public class CapacityScheduler extends
}
private void startSchedulerThreads() {
try {
writeLock.lock();
try {
activitiesManager.start();
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
@ -455,8 +455,8 @@ public class CapacityScheduler extends
@Override
public void serviceStop() throws Exception {
try {
writeLock.lock();
try {
this.activitiesManager.stop();
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
@ -479,8 +479,8 @@ public class CapacityScheduler extends
@Override
public void reinitialize(Configuration newConf, RMContext rmContext)
throws IOException {
try {
writeLock.lock();
try {
Configuration configuration = new Configuration(newConf);
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = csConfProvider.loadConfiguration(configuration);
@ -656,9 +656,8 @@ public class CapacityScheduler extends
try {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
backlogs.take();
try {
cs.writeLock.lock();
try {
cs.tryCommit(cs.getClusterResource(), request, true);
} finally {
cs.writeLock.unlock();
@ -684,8 +683,8 @@ public class CapacityScheduler extends
@VisibleForTesting
public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
try {
readLock.lock();
try {
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule();
ugRule.initialize(this);
return ugRule;
@ -695,8 +694,8 @@ public class CapacityScheduler extends
}
public PlacementRule getAppNameMappingPlacementRule() throws IOException {
try {
readLock.lock();
try {
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule();
anRule.initialize(this);
return anRule;
@ -796,8 +795,8 @@ public class CapacityScheduler extends
private void addApplicationOnRecovery(ApplicationId applicationId,
String queueName, String user,
Priority priority, ApplicationPlacementContext placementContext) {
try {
writeLock.lock();
try {
//check if the queue needs to be auto-created during recovery
CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user,
queueName, placementContext, true);
@ -920,8 +919,8 @@ public class CapacityScheduler extends
private void addApplication(ApplicationId applicationId, String queueName,
String user, Priority priority,
ApplicationPlacementContext placementContext) {
try {
writeLock.lock();
try {
if (isSystemAppsLimitReached()) {
String message = "Maximum system application limit reached,"
+ "cannot accept submission of application: " + applicationId;
@ -1019,8 +1018,8 @@ public class CapacityScheduler extends
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
try {
writeLock.lock();
try {
SchedulerApplication<FiCaSchedulerApp> application = applications.get(
applicationAttemptId.getApplicationId());
if (application == null) {
@ -1072,8 +1071,8 @@ public class CapacityScheduler extends
private void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
try {
writeLock.lock();
try {
SchedulerApplication<FiCaSchedulerApp> application = applications.get(
applicationId);
if (application == null) {
@ -1099,8 +1098,8 @@ public class CapacityScheduler extends
private void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
try {
writeLock.lock();
try {
LOG.info("Application Attempt " + applicationAttemptId + " is done."
+ " finalState=" + rmAppAttemptFinalState);
@ -1214,8 +1213,8 @@ public class CapacityScheduler extends
// make sure we aren't stopping/removing the application
// when the allocate comes in
try {
application.getWriteLock().lock();
try {
if (application.isStopped()) {
return EMPTY_ALLOCATION;
}
@ -1292,8 +1291,8 @@ public class CapacityScheduler extends
@Override
protected void nodeUpdate(RMNode rmNode) {
long begin = System.nanoTime();
try {
readLock.lock();
try {
setLastNodeUpdateTime(Time.now());
super.nodeUpdate(rmNode);
} finally {
@ -1302,8 +1301,8 @@ public class CapacityScheduler extends
// Try to do scheduling
if (!scheduleAsynchronously) {
try {
writeLock.lock();
try {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
rmNode.getNodeID());
@ -1329,8 +1328,8 @@ public class CapacityScheduler extends
*/
private void updateNodeAndQueueResource(RMNode nm,
ResourceOption resourceOption) {
try {
writeLock.lock();
try {
updateNodeResource(nm, resourceOption);
Resource clusterResource = getClusterResource();
getRootQueue().updateClusterResource(clusterResource,
@ -1917,8 +1916,8 @@ public class CapacityScheduler extends
private void updateNodeAttributes(
NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
try {
writeLock.lock();
try {
for (Entry<String, Set<NodeAttribute>> entry : attributeUpdateEvent
.getUpdatedNodeToAttributes().entrySet()) {
String hostname = entry.getKey();
@ -1944,8 +1943,8 @@ public class CapacityScheduler extends
*/
private void updateNodeLabelsAndQueueResource(
NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
try {
writeLock.lock();
try {
Set<String> updateLabels = new HashSet<String>();
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) {
@ -1982,8 +1981,8 @@ public class CapacityScheduler extends
}
private void addNode(RMNode nodeManager) {
try {
writeLock.lock();
try {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName, nodeManager.getNodeLabels());
nodeTracker.addNode(schedulerNode);
@ -2019,8 +2018,8 @@ public class CapacityScheduler extends
}
private void removeNode(RMNode nodeInfo) {
try {
writeLock.lock();
try {
// update this node to node label manager
if (labelManager != null) {
labelManager.deactivateNode(nodeInfo.getNodeID());
@ -2164,8 +2163,8 @@ public class CapacityScheduler extends
public void markContainerForKillable(
RMContainer killableContainer) {
try {
writeLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
+ killableContainer.toString());
@ -2200,8 +2199,8 @@ public class CapacityScheduler extends
private void markContainerForNonKillable(
RMContainer nonKillableContainer) {
try {
writeLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(
SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
@ -2269,8 +2268,8 @@ public class CapacityScheduler extends
private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
try {
readLock.lock();
try {
CSQueue queue = getQueue(queueName);
// Check if the queue is a plan queue
if ((queue == null) || !(queue instanceof PlanQueue)) {
@ -2320,8 +2319,8 @@ public class CapacityScheduler extends
@Override
public void removeQueue(String queueName)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
try {
LOG.info("Removing queue: " + queueName);
CSQueue q = this.getQueue(queueName);
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(
@ -2354,8 +2353,8 @@ public class CapacityScheduler extends
@Override
public void addQueue(Queue queue)
throws SchedulerDynamicEditException, IOException {
try {
writeLock.lock();
try {
if (queue == null) {
throw new SchedulerDynamicEditException(
"Queue specified is null. Should be an implementation of "
@ -2392,8 +2391,8 @@ public class CapacityScheduler extends
@Override
public void setEntitlement(String inQueue, QueueEntitlement entitlement)
throws YarnException {
try {
writeLock.lock();
try {
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
AbstractManagedParentQueue parent =
(AbstractManagedParentQueue) queue.getParent();
@ -2429,8 +2428,8 @@ public class CapacityScheduler extends
@Override
public String moveApplication(ApplicationId appId,
String targetQueueName) throws YarnException {
try {
writeLock.lock();
try {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appId);
if (application == null) {
@ -2481,8 +2480,8 @@ public class CapacityScheduler extends
@Override
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException {
try {
writeLock.lock();
try {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appId);
if (application == null) {
@ -2604,8 +2603,8 @@ public class CapacityScheduler extends
public Priority checkAndGetApplicationPriority(
Priority priorityRequestedByApp, UserGroupInformation user,
String queueName, ApplicationId applicationId) throws YarnException {
try {
readLock.lock();
try {
Priority appPriority = priorityRequestedByApp;
// Verify the scenario where priority is null from submissionContext.
@ -2660,8 +2659,8 @@ public class CapacityScheduler extends
ApplicationId applicationId, SettableFuture<Object> future,
UserGroupInformation user)
throws YarnException {
try {
writeLock.lock();
try {
Priority appPriority = null;
SchedulerApplication<FiCaSchedulerApp> application = applications
.get(applicationId);
@ -3065,9 +3064,8 @@ public class CapacityScheduler extends
*/
public boolean moveReservedContainer(RMContainer toBeMovedContainer,
FiCaSchedulerNode targetNode) {
try {
writeLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to move container=" + toBeMovedContainer + " to node="
+ targetNode.getNodeID());
@ -3121,8 +3119,8 @@ public class CapacityScheduler extends
@Override
public long checkAndGetApplicationLifetime(String queueName,
long lifetimeRequestedByApp) {
try {
readLock.lock();
try {
CSQueue queue = getQueue(queueName);
if (queue == null || !(queue instanceof LeafQueue)) {
return lifetimeRequestedByApp;

View File

@ -169,8 +169,8 @@ public class LeafQueue extends AbstractCSQueue {
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
IOException {
try {
writeLock.lock();
try {
CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
super.setupQueueConfigs(clusterResource, conf);
@ -402,8 +402,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public int getNumApplications() {
try {
readLock.lock();
try {
return getNumPendingApplications() + getNumActiveApplications();
} finally {
readLock.unlock();
@ -411,8 +411,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public int getNumPendingApplications() {
try {
readLock.lock();
try {
return pendingOrderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
@ -420,8 +420,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public int getNumActiveApplications() {
try {
readLock.lock();
try {
return orderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
@ -430,8 +430,8 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public int getNumPendingApplications(String user) {
try {
readLock.lock();
try {
User u = getUser(user);
if (null == u) {
return 0;
@ -444,8 +444,8 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public int getNumActiveApplications(String user) {
try {
readLock.lock();
try {
User u = getUser(user);
if (null == u) {
return 0;
@ -476,8 +476,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public List<QueueUserACLInfo>
getQueueUserAclInfo(UserGroupInformation user) {
try {
readLock.lock();
try {
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<>();
@ -497,8 +497,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public String toString() {
try {
readLock.lock();
try {
return queueName + ": " + "capacity=" + queueCapacities.getCapacity()
+ ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
+ ", " + "usedResources=" + queueUsage.getUsed() + ", "
@ -522,8 +522,8 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public List<AppPriorityACLGroup> getPriorityACLs() {
try {
readLock.lock();
try {
return new ArrayList<>(priorityAcls);
} finally {
readLock.unlock();
@ -535,8 +535,8 @@ public class LeafQueue extends AbstractCSQueue {
CapacitySchedulerConfiguration configuration) throws
IOException {
try {
writeLock.lock();
try {
// Sanity check
if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
@ -582,9 +582,8 @@ public class LeafQueue extends AbstractCSQueue {
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
try {
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
@ -622,8 +621,8 @@ public class LeafQueue extends AbstractCSQueue {
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
try {
writeLock.lock();
try {
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath()
@ -691,8 +690,9 @@ public class LeafQueue extends AbstractCSQueue {
if (userName != null && getUser(userName) != null) {
userWeight = getUser(userName).getWeight();
}
try {
readLock.lock();
try {
/*
* The user am resource limit is based on the same approach as the user
* limit (as it should represent a subset of that). This means that it uses
@ -741,8 +741,8 @@ public class LeafQueue extends AbstractCSQueue {
public Resource calculateAndGetAMResourceLimitPerPartition(
String nodePartition) {
try {
writeLock.lock();
try {
/*
* For non-labeled partition, get the max value from resources currently
* available to the queue and the absolute resources guaranteed for the
@ -794,8 +794,8 @@ public class LeafQueue extends AbstractCSQueue {
}
protected void activateApplications() {
try {
writeLock.lock();
try {
// limit of allowed resource usage for application masters
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
@ -916,8 +916,8 @@ public class LeafQueue extends AbstractCSQueue {
private void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
try {
writeLock.lock();
try {
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
@ -969,9 +969,9 @@ public class LeafQueue extends AbstractCSQueue {
private void removeApplicationAttempt(
FiCaSchedulerApp application, String userName) {
try {
writeLock.lock();
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
@ -1228,8 +1228,8 @@ public class LeafQueue extends AbstractCSQueue {
// Do not check limits when allocation from a reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
try {
readLock.lock();
try {
FiCaSchedulerApp app =
schedulerContainer.getSchedulerApplicationAttempt();
String username = app.getUser();
@ -1329,9 +1329,8 @@ public class LeafQueue extends AbstractCSQueue {
releaseContainers(cluster, request);
try {
writeLock.lock();
try {
if (request.anythingAllocatedOrReserved()) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
allocation = request.getFirstAllocatedOrReservedContainer();
@ -1549,8 +1548,9 @@ public class LeafQueue extends AbstractCSQueue {
protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
String nodePartition, ResourceLimits currentResourceLimits) {
try {
readLock.lock();
try {
User user = getUser(userName);
if (user == null) {
if (LOG.isDebugEnabled()) {
@ -1631,8 +1631,8 @@ public class LeafQueue extends AbstractCSQueue {
*/
public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
try {
writeLock.lock();
try {
ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
@ -1661,8 +1661,8 @@ public class LeafQueue extends AbstractCSQueue {
boolean removed = false;
// Careful! Locking order is important!
try {
writeLock.lock();
try {
Container container = rmContainer.getContainer();
// Inform the application & the node
@ -1714,8 +1714,8 @@ public class LeafQueue extends AbstractCSQueue {
void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
String nodePartition, RMContainer rmContainer) {
try {
writeLock.lock();
try {
super.allocateResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
@ -1759,8 +1759,8 @@ public class LeafQueue extends AbstractCSQueue {
void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
RMContainer rmContainer) {
try {
writeLock.lock();
try {
super.releaseResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
@ -1815,8 +1815,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
try {
writeLock.lock();
try {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
@ -1898,8 +1898,8 @@ public class LeafQueue extends AbstractCSQueue {
return;
}
// Careful! Locking order is important!
try {
writeLock.lock();
try {
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt,
@ -1962,8 +1962,8 @@ public class LeafQueue extends AbstractCSQueue {
public Resource getTotalPendingResourcesConsideringUserLimit(
Resource clusterResources, String partition,
boolean deductReservedFromPending) {
try {
readLock.lock();
try {
Map<String, Resource> userNameToHeadroom =
new HashMap<>();
Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
@ -2006,8 +2006,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
try {
readLock.lock();
try {
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
@ -2066,9 +2066,9 @@ public class LeafQueue extends AbstractCSQueue {
public Map<String, TreeSet<RMContainer>>
getIgnoreExclusivityRMContainers() {
Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
try {
readLock.lock();
readLock.lock();
try {
for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers
.entrySet()) {
clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue()));
@ -2117,8 +2117,8 @@ public class LeafQueue extends AbstractCSQueue {
void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
try {
writeLock.lock();
try {
if (null != this.orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(
this.orderingPolicy.getSchedulableEntities());
@ -2136,8 +2136,8 @@ public class LeafQueue extends AbstractCSQueue {
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) {
try {
writeLock.lock();
try {
FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
if (!isActive) {
@ -2188,8 +2188,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void stopQueue() {
try {
writeLock.lock();
try {
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {

View File

@ -74,8 +74,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
try {
writeLock.lock();
try {
validate(newlyParsedQueue);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
@ -184,9 +184,9 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
@Override
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException, IOException {
try {
writeLock.lock();
writeLock.lock();
try {
if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Expected child queue to be an instance of AutoCreatedLeafQueue");
@ -231,8 +231,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
public List<FiCaSchedulerApp> getScheduleableApplications() {
try {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getApplications());
@ -244,8 +244,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
public List<FiCaSchedulerApp> getPendingApplications() {
try {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getPendingApplications());
@ -257,8 +257,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
public List<FiCaSchedulerApp> getAllApplications() {
try {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getAllApplications());
@ -286,9 +286,9 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
public void validateAndApplyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws IOException, SchedulerDynamicEditException {
try {
writeLock.lock();
writeLock.lock();
try {
validateQueueManagementChanges(queueManagementChanges);
applyQueueManagementChanges(queueManagementChanges);

View File

@ -126,8 +126,8 @@ public class ParentQueue extends AbstractCSQueue {
protected void setupQueueConfigs(Resource clusterResource)
throws IOException {
try {
writeLock.lock();
try {
super.setupQueueConfigs(clusterResource);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
@ -166,8 +166,8 @@ public class ParentQueue extends AbstractCSQueue {
private static float PRECISION = 0.0005f; // 0.05% precision
void setChildQueues(Collection<CSQueue> childQueues) {
try {
writeLock.lock();
try {
// Validate
float childCapacities = 0;
Resource minResDefaultLabel = Resources.createResource(0, 0);
@ -257,8 +257,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
try {
readLock.lock();
try {
QueueInfo queueInfo = getQueueInfo();
List<QueueInfo> childQueuesInfo = new ArrayList<>();
@ -279,8 +279,8 @@ public class ParentQueue extends AbstractCSQueue {
private QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
try {
readLock.lock();
try {
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
@ -302,8 +302,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation user) {
try {
readLock.lock();
try {
List<QueueUserACLInfo> userAcls = new ArrayList<>();
// Add parent queue acls
@ -335,8 +335,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
try {
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
@ -430,9 +430,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
try {
writeLock.lock();
try {
// Sanity check
validateSubmitApplication(applicationId, user, queue);
@ -456,8 +455,8 @@ public class ParentQueue extends AbstractCSQueue {
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
try {
writeLock.lock();
try {
if (queue.equals(queueName)) {
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
@ -487,9 +486,8 @@ public class ParentQueue extends AbstractCSQueue {
private void addApplication(ApplicationId applicationId,
String user) {
try {
writeLock.lock();
try {
++numApplications;
LOG.info(
@ -516,8 +514,8 @@ public class ParentQueue extends AbstractCSQueue {
private void removeApplication(ApplicationId applicationId,
String user) {
try {
writeLock.lock();
try {
--numApplications;
LOG.info("Application removed -" + " appId: " + applicationId + " user: "
@ -854,8 +852,8 @@ public class ParentQueue extends AbstractCSQueue {
private void internalReleaseResource(Resource clusterResource,
FiCaSchedulerNode node, Resource releasedResource) {
try {
writeLock.lock();
try {
super.releaseResource(clusterResource, releasedResource,
node.getPartition());
@ -891,9 +889,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits) {
try {
writeLock.lock();
try {
// Update effective capacity in all parent queue.
Set<String> configuredNodelabels = csContext.getConfiguration()
.getConfiguredNodeLabels(getQueuePath());
@ -1133,8 +1130,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public List<CSQueue> getChildQueues() {
try {
readLock.lock();
try {
return new ArrayList<CSQueue>(childQueues);
} finally {
readLock.unlock();
@ -1153,8 +1150,8 @@ public class ParentQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
try {
writeLock.lock();
try {
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource,
@ -1177,8 +1174,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
try {
readLock.lock();
try {
for (CSQueue queue : childQueues) {
queue.collectSchedulerApplications(apps);
}
@ -1233,8 +1230,8 @@ public class ParentQueue extends AbstractCSQueue {
void allocateResource(Resource clusterResource,
Resource resource, String nodePartition) {
try {
writeLock.lock();
try {
super.allocateResource(clusterResource, resource, nodePartition);
/**
@ -1331,8 +1328,8 @@ public class ParentQueue extends AbstractCSQueue {
// Do not modify queue when allocation from reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
try {
writeLock.lock();
try {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
@ -1355,8 +1352,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void stopQueue() {
try {
this.writeLock.lock();
try {
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {

View File

@ -81,8 +81,8 @@ public class PlanQueue extends AbstractManagedParentQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
try {
// Sanity check
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {

View File

@ -83,8 +83,8 @@ public class QueueCapacities {
}
private float _get(String label, CapacityType type) {
try {
readLock.lock();
try {
Capacities cap = capacitiesMap.get(label);
if (null == cap) {
return LABEL_DOESNT_EXIST_CAP;
@ -96,8 +96,8 @@ public class QueueCapacities {
}
private void _set(String label, CapacityType type, float value) {
try {
writeLock.lock();
try {
Capacities cap = capacitiesMap.get(label);
if (null == cap) {
cap = new Capacities();
@ -277,8 +277,8 @@ public class QueueCapacities {
* configurable fields, and load new values
*/
public void clearConfigurableFields() {
try {
writeLock.lock();
try {
for (String label : capacitiesMap.keySet()) {
_set(label, CapacityType.CAP, 0);
_set(label, CapacityType.MAX_CAP, 0);
@ -291,8 +291,8 @@ public class QueueCapacities {
}
public Set<String> getExistingNodeLabels() {
try {
readLock.lock();
try {
return new HashSet<String>(capacitiesMap.keySet());
} finally {
readLock.unlock();
@ -301,8 +301,8 @@ public class QueueCapacities {
@Override
public String toString() {
try {
readLock.lock();
try {
return this.capacitiesMap.toString();
} finally {
readLock.unlock();
@ -310,8 +310,8 @@ public class QueueCapacities {
}
public Set<String> getNodePartitionsSet() {
try {
readLock.lock();
try {
return capacitiesMap.keySet();
} finally {
readLock.unlock();

View File

@ -53,8 +53,8 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
try {
// Sanity check
if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {

View File

@ -114,8 +114,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
try {
float usage = 0f;
if (usageRatios.containsKey(label)) {
usage = usageRatios.get(label);
@ -128,8 +128,8 @@ public class UsersManager implements AbstractUsersManager {
}
private float getUsageRatio(String label) {
try {
readLock.lock();
try {
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
@ -141,8 +141,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
try {
usageRatios.put(label, ratio);
} finally {
writeLock.unlock();
@ -179,8 +179,8 @@ public class UsersManager implements AbstractUsersManager {
public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
try {
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
} finally {
@ -190,8 +190,8 @@ public class UsersManager implements AbstractUsersManager {
public float updateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
try {
float delta;
float newRatio = Resources.ratio(resourceCalculator,
getUsed(nodePartition), resource);
@ -358,8 +358,8 @@ public class UsersManager implements AbstractUsersManager {
// If latestVersionOfUsersState is negative due to overflow, ideally we need
// to reset it. This method is invoked from UsersManager and LeafQueue and
// all is happening within write/readLock. Below logic can help to set 0.
try {
writeLock.lock();
try {
long value = latestVersionOfUsersState.incrementAndGet();
if (value < 0) {
@ -395,8 +395,8 @@ public class UsersManager implements AbstractUsersManager {
* User Name
*/
public void removeUser(String userName) {
try {
writeLock.lock();
try {
this.users.remove(userName);
// Remove user from active/non-active list as well.
@ -417,8 +417,8 @@ public class UsersManager implements AbstractUsersManager {
* @return User object
*/
public User getUserAndAddIfAbsent(String userName) {
try {
writeLock.lock();
try {
User u = getUser(userName);
if (null == u) {
u = new User(userName);
@ -448,8 +448,8 @@ public class UsersManager implements AbstractUsersManager {
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public ArrayList<UserInfo> getUsersInfo() {
try {
readLock.lock();
try {
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : getUsers().entrySet()) {
User user = entry.getValue();
@ -494,8 +494,8 @@ public class UsersManager implements AbstractUsersManager {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode;
try {
writeLock.lock();
try {
userLimitPerSchedulingMode =
preComputedActiveUserLimit.get(nodePartition);
if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
@ -553,8 +553,8 @@ public class UsersManager implements AbstractUsersManager {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode;
try {
writeLock.lock();
try {
userLimitPerSchedulingMode = preComputedAllUserLimit.get(nodePartition);
if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
// recompute
@ -602,8 +602,8 @@ public class UsersManager implements AbstractUsersManager {
*/
private void setLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
try {
writeLock.lock();
try {
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
@ -626,8 +626,8 @@ public class UsersManager implements AbstractUsersManager {
*/
private long getLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
try {
this.readLock.lock();
try {
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
@ -825,8 +825,8 @@ public class UsersManager implements AbstractUsersManager {
* Cluster Resource
*/
public void updateUsageRatio(String partition, Resource clusterResource) {
try {
writeLock.lock();
try {
Resource resourceByLabel = labelManager.getResourceByLabel(partition,
clusterResource);
float consumed = 0;
@ -852,9 +852,9 @@ public class UsersManager implements AbstractUsersManager {
@Override
public void activateApplication(String user, ApplicationId applicationId) {
try {
this.writeLock.lock();
this.writeLock.lock();
try {
User userDesc = getUser(user);
if (userDesc != null && userDesc.getActiveApplications() <= 0) {
return;
@ -885,9 +885,9 @@ public class UsersManager implements AbstractUsersManager {
@Override
public void deactivateApplication(String user, ApplicationId applicationId) {
try {
this.writeLock.lock();
this.writeLock.lock();
try {
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps != null) {
if (userApps.remove(applicationId)) {
@ -919,8 +919,8 @@ public class UsersManager implements AbstractUsersManager {
float sumActiveUsersTimesWeights() {
float count = 0.0f;
try {
this.readLock.lock();
try {
for (String u : activeUsersSet) {
count += getUser(u).getWeight();
}
@ -932,8 +932,8 @@ public class UsersManager implements AbstractUsersManager {
float sumAllUsersTimesWeights() {
float count = 0.0f;
try {
this.readLock.lock();
try {
for (String u : users.keySet()) {
count += getUser(u).getWeight();
}
@ -944,9 +944,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void updateActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
try {
// For UT case: We might need to add the user to users list.
User user = getUserAndAddIfAbsent(userName);
ResourceUsage resourceUsage = user.getResourceUsage();
@ -983,8 +982,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void updateNonActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
try {
// For UT case: We might need to add the user to users list.
User user = getUser(userName);
@ -1052,8 +1051,8 @@ public class UsersManager implements AbstractUsersManager {
*/
public User updateUserResourceUsage(String userName, Resource resource,
String nodePartition, boolean isAllocate) {
try {
this.writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
@ -1099,8 +1098,8 @@ public class UsersManager implements AbstractUsersManager {
}
public void updateUserWeights() {
try {
this.writeLock.lock();
try {
for (Map.Entry<String, User> ue : users.entrySet()) {
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
}

View File

@ -43,8 +43,8 @@ public class PreemptionManager {
}
public void refreshQueues(CSQueue parent, CSQueue current) {
try {
writeLock.lock();
try {
PreemptableQueue parentEntity = null;
if (parent != null) {
parentEntity = entities.get(parent.getQueueName());
@ -67,8 +67,8 @@ public class PreemptionManager {
}
public void addKillableContainer(KillableContainer container) {
try {
writeLock.lock();
try {
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.addKillableContainer(container);
@ -80,8 +80,8 @@ public class PreemptionManager {
}
public void removeKillableContainer(KillableContainer container) {
try {
writeLock.lock();
try {
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.removeKillableContainer(container);
@ -106,8 +106,8 @@ public class PreemptionManager {
@VisibleForTesting
public Map<ContainerId, RMContainer> getKillableContainersMap(
String queueName, String partition) {
try {
readLock.lock();
try {
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Map<ContainerId, RMContainer> containers =
@ -129,8 +129,8 @@ public class PreemptionManager {
}
public Resource getKillableResource(String queueName, String partition) {
try {
readLock.lock();
try {
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Resource res = entity.getTotalKillableResources().get(partition);
@ -147,8 +147,8 @@ public class PreemptionManager {
}
public Map<String, PreemptableQueue> getShallowCopyOfPreemptableQueues() {
try {
readLock.lock();
try {
Map<String, PreemptableQueue> map = new HashMap<>();
for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
String key = entry.getKey();

View File

@ -202,8 +202,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
new HashMap<String, Float>();
private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
try {
readLock.lock();
try {
Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (totalActivatedCapacity != null) {
@ -218,8 +218,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void incAbsoluteActivatedChildCapacity(String nodeLabel,
float childQueueCapacity) {
try {
writeLock.lock();
try {
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
@ -236,8 +236,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void decAbsoluteActivatedChildCapacity(String nodeLabel,
float childQueueCapacity) {
try {
writeLock.lock();
try {
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
@ -360,8 +360,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
//synch/add missing leaf queue(s) if any to state
updateLeafQueueState();
try {
readLock.lock();
try {
List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
@ -483,8 +483,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@VisibleForTesting
void updateLeafQueueState() {
try {
writeLock.lock();
try {
Set<String> newPartitions = new HashSet<>();
Set<String> newQueues = new HashSet<>();
@ -570,8 +570,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@VisibleForTesting
public boolean isActive(final AutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
try {
readLock.lock();
try {
LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue,
nodeLabel);
return leafQueueStatus.isActive();
@ -649,8 +649,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
public void commitQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
try {
for (QueueManagementChange queueManagementChange :
queueManagementChanges) {
AutoCreatedLeafQueueConfig updatedQueueTemplate =
@ -695,8 +695,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void activate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
try {
writeLock.lock();
try {
getLeafQueueState(leafQueue, nodeLabel).activate();
parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
@ -707,8 +707,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
try {
writeLock.lock();
try {
getLeafQueueState(leafQueue, nodeLabel).deactivate();
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
@ -765,9 +765,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
.getClass());
}
try {
writeLock.lock();
try {
QueueCapacities capacities = new QueueCapacities(false);
for (String nodeLabel : leafQueueTemplateNodeLabels) {
if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue,
@ -816,8 +815,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@VisibleForTesting
LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
String partition) throws SchedulerDynamicEditException {
try {
readLock.lock();
try {
String queueName = queue.getQueueName();
if (!leafQueueState.containsLeafQueue(queueName, partition)) {
throw new SchedulerDynamicEditException(

View File

@ -201,8 +201,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
try {
writeLock.lock();
try {
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
@ -246,8 +246,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public RMContainer allocate(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Container container) {
try {
readLock.lock();
try {
if (isStopped) {
return null;
@ -438,8 +438,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
ContainerRequest containerRequest = null;
boolean reReservation = false;
try {
readLock.lock();
try {
// First make sure no container in release list in final state
if (anyContainerInFinalState(request)) {
@ -561,8 +561,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
FiCaSchedulerNode> request, boolean updatePending) {
boolean reReservation = false;
try {
writeLock.lock();
try {
// If we allocated something
if (request.anythingAllocatedOrReserved()) {
@ -693,8 +693,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public boolean unreserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer) {
try {
writeLock.lock();
try {
// Done with the reservation?
if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this);
@ -749,8 +749,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
public Map<String, Resource> getTotalPendingRequestsPerPartition() {
try {
readLock.lock();
try {
Map<String, Resource> ret = new HashMap<>();
for (SchedulerRequestKey schedulerKey : appSchedulingInfo
@ -781,8 +781,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
public void markContainerForPreemption(ContainerId cont) {
try {
writeLock.lock();
try {
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
@ -804,8 +804,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
*/
public Allocation getAllocation(ResourceCalculator resourceCalculator,
Resource clusterResource, Resource minimumAllocation) {
try {
writeLock.lock();
try {
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containersToPreempt));
containersToPreempt.clear();
@ -873,8 +873,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) {
try {
writeLock.lock();
try {
this.headroomProvider = headroomProvider;
} finally {
writeLock.unlock();
@ -883,8 +883,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@Override
public Resource getHeadroom() {
try {
readLock.lock();
try {
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
@ -898,8 +898,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@Override
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
try {
writeLock.lock();
try {
super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
} finally {
@ -926,8 +926,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@VisibleForTesting
public RMContainer findNodeToUnreserve(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) {
try {
readLock.lock();
try {
// need to unreserve some other container first
NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey,
minimumUnreservedResource, rc);
@ -1108,11 +1108,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
*/
@Override
public ApplicationResourceUsageReport getResourceUsageReport() {
writeLock.lock();
try {
// Use write lock here because
// SchedulerApplicationAttempt#getResourceUsageReport updated fields
// TODO: improve this
writeLock.lock();
ApplicationResourceUsageReport report = super.getResourceUsageReport();
Resource cluster = rmContext.getScheduler().getClusterResource();
Resource totalPartitionRes =
@ -1175,8 +1175,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
*/
public boolean moveReservation(RMContainer reservedContainer,
FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) {
try {
writeLock.lock();
try {
if (!sourceNode.getPartition().equals(targetNode.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug(

View File

@ -83,8 +83,8 @@ public class MemoryPlacementConstraintManager
Map<Set<String>, PlacementConstraint> constraintMap) {
// Check if app already exists. If not, prepare its constraint map.
Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
try {
readLock.lock();
try {
if (appConstraints.get(appId) != null) {
LOG.warn("Application {} has already been registered.", appId);
return;
@ -109,8 +109,8 @@ public class MemoryPlacementConstraintManager
appId);
}
// Update appConstraints.
try {
writeLock.lock();
try {
appConstraints.put(appId, constraintsForApp);
} finally {
writeLock.unlock();
@ -120,8 +120,8 @@ public class MemoryPlacementConstraintManager
@Override
public void addConstraint(ApplicationId appId, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
try {
writeLock.lock();
try {
Map<String, PlacementConstraint> constraintsForApp =
appConstraints.get(appId);
if (constraintsForApp == null) {
@ -140,8 +140,8 @@ public class MemoryPlacementConstraintManager
@Override
public void addGlobalConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
try {
writeLock.lock();
try {
addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
replace);
} finally {
@ -181,8 +181,8 @@ public class MemoryPlacementConstraintManager
@Override
public Map<Set<String>, PlacementConstraint> getConstraints(
ApplicationId appId) {
try {
readLock.lock();
try {
if (appConstraints.get(appId) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Application {} is not registered in the Placement "
@ -212,8 +212,8 @@ public class MemoryPlacementConstraintManager
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
try {
readLock.lock();
try {
if (appConstraints.get(appId) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Application {} is not registered in the Placement "
@ -235,8 +235,8 @@ public class MemoryPlacementConstraintManager
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
try {
readLock.lock();
try {
return globalConstraints.get(sourceTag);
} finally {
readLock.unlock();
@ -284,8 +284,8 @@ public class MemoryPlacementConstraintManager
@Override
public void unregisterApplication(ApplicationId appId) {
try {
writeLock.lock();
try {
appConstraints.remove(appId);
} finally {
writeLock.unlock();
@ -298,8 +298,8 @@ public class MemoryPlacementConstraintManager
return;
}
String sourceTag = getValidSourceTag(sourceTags);
try {
writeLock.lock();
try {
globalConstraints.remove(sourceTag);
} finally {
writeLock.unlock();
@ -308,8 +308,8 @@ public class MemoryPlacementConstraintManager
@Override
public int getNumRegisteredApplications() {
try {
readLock.lock();
try {
return appConstraints.size();
} finally {
readLock.unlock();
@ -318,8 +318,8 @@ public class MemoryPlacementConstraintManager
@Override
public int getNumGlobalConstraints() {
try {
readLock.lock();
try {
return globalConstraints.size();
} finally {
readLock.unlock();

View File

@ -132,8 +132,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
try {
writeLock.lock();
try {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
@ -182,8 +182,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private void unreserveInternal(
SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
try {
writeLock.lock();
try {
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
schedulerKey);
RMContainer reservedContainer = reservedContainers.remove(
@ -285,8 +285,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return NodeType.OFF_SWITCH;
}
try {
writeLock.lock();
try {
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
@ -355,8 +355,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return NodeType.OFF_SWITCH;
}
try {
writeLock.lock();
try {
// default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
@ -426,8 +426,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
RMContainer rmContainer;
Container container;
try {
writeLock.lock();
try {
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
if (allowed != null) {
@ -499,8 +499,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
void resetAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, NodeType level) {
NodeType old;
try {
writeLock.lock();
try {
old = allowedLocalityLevel.put(schedulerKey, level);
} finally {
writeLock.unlock();
@ -665,9 +665,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
@Override
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
try {
writeLock.lock();
try {
super.recoverContainer(node, rmContainer);
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
@ -777,8 +776,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
String rackName =
node.getRackName() == null ? "NULL" : node.getRackName();
try {
writeLock.lock();
try {
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations == null) {
rackReservations = new HashSet<>();
@ -794,8 +793,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
String rackName =
node.getRackName() == null ? "NULL" : node.getRackName();
try {
writeLock.lock();
try {
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations != null) {
rackReservations.remove(node.getNodeName());
@ -964,8 +963,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
try {
writeLock.lock();
try {
// TODO (wandga): All logics in this method should be added to
// SchedulerPlacement#canDelayTo which is independent from scheduler.

View File

@ -54,9 +54,9 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Container container) {
try {
writeLock.lock();
writeLock.lock();
try {
if (isStopped) {
return null;
}

View File

@ -155,9 +155,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
public PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
try {
this.writeLock.lock();
this.writeLock.lock();
try {
PendingAskUpdateResult updateResult = null;
// Update resource requests
@ -228,8 +228,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public PendingAsk getPendingAsk(String resourceName) {
try {
readLock.lock();
try {
ResourceRequest request = getResourceRequest(resourceName);
if (null == request) {
return PendingAsk.ZERO;
@ -245,8 +245,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public int getOutstandingAsksCount(String resourceName) {
try {
readLock.lock();
try {
ResourceRequest request = getResourceRequest(resourceName);
if (null == request) {
return 0;
@ -353,8 +353,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
try {
readLock.lock();
try {
ResourceRequest r = resourceRequestMap.get(
ResourceRequest.ANY);
if (r == null || r.getNumContainers() <= 0) {
@ -381,8 +381,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public boolean canDelayTo(String resourceName) {
try {
readLock.lock();
try {
ResourceRequest request = getResourceRequest(resourceName);
return request == null || request.getRelaxLocality();
} finally {
@ -432,8 +432,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node) {
try {
writeLock.lock();
try {
List<ResourceRequest> resourceRequests = new ArrayList<>();

View File

@ -363,8 +363,8 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
try {
readLock.lock();
try {
return checkCardinalityAndPending(node);
} finally {
readLock.unlock();
@ -411,8 +411,8 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
@Override
public void showRequests() {
try {
readLock.lock();
try {
if (schedulingRequest != null) {
LOG.info(schedulingRequest.toString());
}

View File

@ -191,8 +191,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
public NMToken createAndGetNMToken(String applicationSubmitter,
ApplicationAttemptId appAttemptId, Container container) {
try {
this.writeLock.lock();
try {
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
NMToken nmToken = null;
if (nodeSet != null) {
@ -213,8 +213,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
}
public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
try {
this.writeLock.lock();
try {
this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet<NodeId>());
} finally {
this.writeLock.unlock();
@ -225,8 +225,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
@VisibleForTesting
public boolean isApplicationAttemptRegistered(
ApplicationAttemptId appAttemptId) {
try {
this.readLock.lock();
try {
return this.appAttemptToNodeKeyMap.containsKey(appAttemptId);
} finally {
this.readLock.unlock();
@ -237,8 +237,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
@VisibleForTesting
public boolean isApplicationAttemptNMTokenPresent(
ApplicationAttemptId appAttemptId, NodeId nodeId) {
try {
this.readLock.lock();
try {
HashSet<NodeId> nodes = this.appAttemptToNodeKeyMap.get(appAttemptId);
if (nodes != null && nodes.contains(nodeId)) {
return true;
@ -251,8 +251,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
}
public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) {
try {
this.writeLock.lock();
try {
this.appAttemptToNodeKeyMap.remove(appAttemptId);
} finally {
this.writeLock.unlock();
@ -265,8 +265,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
* @param nodeId
*/
public void removeNodeKey(NodeId nodeId) {
try {
this.writeLock.lock();
try {
Iterator<HashSet<NodeId>> appNodeKeySetIterator =
this.appAttemptToNodeKeyMap.values().iterator();
while (appNodeKeySetIterator.hasNext()) {

View File

@ -124,8 +124,8 @@ public class VolumeImpl implements Volume {
@Override
public VolumeState getVolumeState() {
try {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
@ -134,8 +134,8 @@ public class VolumeImpl implements Volume {
@Override
public VolumeId getVolumeId() {
try {
readLock.lock();
try {
return this.volumeId;
} finally {
readLock.unlock();
@ -184,8 +184,8 @@ public class VolumeImpl implements Volume {
@Override
public void handle(VolumeEvent event) {
try {
this.writeLock.lock();
try {
VolumeId volumeId = event.getVolumeId();
if (volumeId == null) {