YARN-6519. Fix warnings from Spotbugs in hadoop-yarn-server-resourcemanager. Contributed by Weiwei Yang.
This commit is contained in:
parent
64f68cb0b8
commit
30fc580196
|
@ -393,7 +393,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
return hasApplicationMasterRegistered;
|
return hasApplicationMasterRegistered;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final static List<Container> EMPTY_CONTAINER_LIST =
|
private final static List<Container> EMPTY_CONTAINER_LIST =
|
||||||
new ArrayList<Container>();
|
new ArrayList<Container>();
|
||||||
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
||||||
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
|
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
|
||||||
|
|
|
@ -52,7 +52,6 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -306,16 +305,12 @@ public class ProportionalCapacityPreemptionPolicy
|
||||||
|
|
||||||
private void cleanupStaledPreemptionCandidates(long currentTime) {
|
private void cleanupStaledPreemptionCandidates(long currentTime) {
|
||||||
// Keep the preemptionCandidates list clean
|
// Keep the preemptionCandidates list clean
|
||||||
for (Iterator<RMContainer> i = preemptionCandidates.keySet().iterator();
|
|
||||||
i.hasNext(); ) {
|
|
||||||
RMContainer id = i.next();
|
|
||||||
// garbage collect containers that are irrelevant for preemption
|
// garbage collect containers that are irrelevant for preemption
|
||||||
// And avoid preempt selected containers for *this execution*
|
// And avoid preempt selected containers for *this execution*
|
||||||
// or within 1 ms
|
// or within 1 ms
|
||||||
if (preemptionCandidates.get(id) + 2 * maxWaitTime < currentTime) {
|
preemptionCandidates.entrySet()
|
||||||
i.remove();
|
.removeIf(candidate ->
|
||||||
}
|
candidate.getValue() + 2 * maxWaitTime < currentTime);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<String> getLeafQueueNames(TempQueuePerPartition q) {
|
private Set<String> getLeafQueueNames(TempQueuePerPartition q) {
|
||||||
|
|
|
@ -1000,9 +1000,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
// if am crashed and not received this response, we should resend
|
// if am crashed and not received this response, we should resend
|
||||||
// this msg again after am restart
|
// this msg again after am restart
|
||||||
if (!this.finishedContainersSentToAM.isEmpty()) {
|
if (!this.finishedContainersSentToAM.isEmpty()) {
|
||||||
for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) {
|
for (Map.Entry<NodeId, List<ContainerStatus>> finishedContainer
|
||||||
|
: this.finishedContainersSentToAM.entrySet()) {
|
||||||
List<ContainerStatus> containerStatuses =
|
List<ContainerStatus> containerStatuses =
|
||||||
this.finishedContainersSentToAM.get(nodeId);
|
finishedContainer.getValue();
|
||||||
|
NodeId nodeId = finishedContainer.getKey();
|
||||||
this.justFinishedContainers.putIfAbsent(nodeId, new ArrayList<>());
|
this.justFinishedContainers.putIfAbsent(nodeId, new ArrayList<>());
|
||||||
this.justFinishedContainers.get(nodeId).addAll(containerStatuses);
|
this.justFinishedContainers.get(nodeId).addAll(containerStatuses);
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,7 +152,7 @@ public class RMAppAttemptMetrics {
|
||||||
|
|
||||||
public void incNumAllocatedContainers(NodeType containerType,
|
public void incNumAllocatedContainers(NodeType containerType,
|
||||||
NodeType requestType) {
|
NodeType requestType) {
|
||||||
localityStatistics[containerType.index][requestType.index]++;
|
localityStatistics[containerType.getIndex()][requestType.getIndex()]++;
|
||||||
totalAllocatedContainers++;
|
totalAllocatedContainers++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -132,7 +132,7 @@ public abstract class AbstractYarnScheduler
|
||||||
protected int nmExpireInterval;
|
protected int nmExpireInterval;
|
||||||
protected long nmHeartbeatInterval;
|
protected long nmHeartbeatInterval;
|
||||||
|
|
||||||
protected final static List<Container> EMPTY_CONTAINER_LIST =
|
private final static List<Container> EMPTY_CONTAINER_LIST =
|
||||||
new ArrayList<Container>();
|
new ArrayList<Container>();
|
||||||
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
||||||
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
|
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
|
||||||
|
|
|
@ -23,9 +23,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
*/
|
*/
|
||||||
public enum NodeType {
|
public enum NodeType {
|
||||||
NODE_LOCAL(0), RACK_LOCAL(1), OFF_SWITCH(2);
|
NODE_LOCAL(0), RACK_LOCAL(1), OFF_SWITCH(2);
|
||||||
public int index;
|
|
||||||
|
|
||||||
private NodeType(int index) {
|
private final int index;
|
||||||
|
|
||||||
|
NodeType(int index) {
|
||||||
this.index = index;
|
this.index = index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the index of the node type
|
||||||
|
*/
|
||||||
|
public int getIndex() {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,20 +142,29 @@ public class QueueMetrics implements MetricsSource {
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
public synchronized static void clearQueueMetrics() {
|
public synchronized static void clearQueueMetrics() {
|
||||||
queueMetrics.clear();
|
QUEUE_METRICS.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple metrics cache to help prevent re-registrations.
|
* Simple metrics cache to help prevent re-registrations.
|
||||||
*/
|
*/
|
||||||
protected final static Map<String, QueueMetrics> queueMetrics =
|
private static final Map<String, QueueMetrics> QUEUE_METRICS =
|
||||||
new HashMap<String, QueueMetrics>();
|
new HashMap<String, QueueMetrics>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the metrics cache to help prevent re-registrations.
|
||||||
|
*
|
||||||
|
* @return A string to {@link QueueMetrics} map.
|
||||||
|
*/
|
||||||
|
protected static Map<String, QueueMetrics> getQueueMetrics() {
|
||||||
|
return QUEUE_METRICS;
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized
|
public synchronized
|
||||||
static QueueMetrics forQueue(MetricsSystem ms, String queueName,
|
static QueueMetrics forQueue(MetricsSystem ms, String queueName,
|
||||||
Queue parent, boolean enableUserMetrics,
|
Queue parent, boolean enableUserMetrics,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
QueueMetrics metrics = queueMetrics.get(queueName);
|
QueueMetrics metrics = QUEUE_METRICS.get(queueName);
|
||||||
if (metrics == null) {
|
if (metrics == null) {
|
||||||
metrics =
|
metrics =
|
||||||
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
|
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
|
||||||
|
@ -168,7 +177,7 @@ public class QueueMetrics implements MetricsSource {
|
||||||
sourceName(queueName).toString(),
|
sourceName(queueName).toString(),
|
||||||
"Metrics for queue: " + queueName, metrics);
|
"Metrics for queue: " + queueName, metrics);
|
||||||
}
|
}
|
||||||
queueMetrics.put(queueName, metrics);
|
QUEUE_METRICS.put(queueName, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
return metrics;
|
return metrics;
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||||
public synchronized static CSQueueMetrics forQueue(String queueName,
|
public synchronized static CSQueueMetrics forQueue(String queueName,
|
||||||
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
QueueMetrics metrics = queueMetrics.get(queueName);
|
QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
|
||||||
if (metrics == null) {
|
if (metrics == null) {
|
||||||
metrics =
|
metrics =
|
||||||
new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
||||||
|
@ -127,7 +127,7 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||||
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
|
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
|
||||||
+ queueName, metrics);
|
+ queueName, metrics);
|
||||||
}
|
}
|
||||||
queueMetrics.put(queueName, metrics);
|
QueueMetrics.getQueueMetrics().put(queueName, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (CSQueueMetrics) metrics;
|
return (CSQueueMetrics) metrics;
|
||||||
|
|
|
@ -69,9 +69,11 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
new Comparator<CSQueue>() {
|
new Comparator<CSQueue>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(CSQueue q1, CSQueue q2) {
|
public int compare(CSQueue q1, CSQueue q2) {
|
||||||
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
|
int result = Float.compare(q1.getUsedCapacity(),
|
||||||
|
q2.getUsedCapacity());
|
||||||
|
if (result < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
|
} else if (result > 0) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -206,7 +206,7 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
public synchronized
|
public synchronized
|
||||||
static FSQueueMetrics forQueue(MetricsSystem ms, String queueName,
|
static FSQueueMetrics forQueue(MetricsSystem ms, String queueName,
|
||||||
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
||||||
QueueMetrics metrics = queueMetrics.get(queueName);
|
QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
|
||||||
if (metrics == null) {
|
if (metrics == null) {
|
||||||
metrics = new FSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
metrics = new FSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
||||||
.tag(QUEUE_INFO, queueName);
|
.tag(QUEUE_INFO, queueName);
|
||||||
|
@ -217,7 +217,7 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
sourceName(queueName).toString(),
|
sourceName(queueName).toString(),
|
||||||
"Metrics for queue: " + queueName, metrics);
|
"Metrics for queue: " + queueName, metrics);
|
||||||
}
|
}
|
||||||
queueMetrics.put(queueName, metrics);
|
QueueMetrics.getQueueMetrics().put(queueName, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (FSQueueMetrics)metrics;
|
return (FSQueueMetrics)metrics;
|
||||||
|
|
|
@ -155,10 +155,10 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
* Remove apps that have their preemption requests fulfilled.
|
* Remove apps that have their preemption requests fulfilled.
|
||||||
*/
|
*/
|
||||||
private synchronized void cleanupPreemptionList() {
|
private synchronized void cleanupPreemptionList() {
|
||||||
Iterator<FSAppAttempt> iterator =
|
Iterator<Map.Entry<FSAppAttempt, Resource>> iterator =
|
||||||
resourcesPreemptedForApp.keySet().iterator();
|
resourcesPreemptedForApp.entrySet().iterator();
|
||||||
while(iterator.hasNext()) {
|
while(iterator.hasNext()) {
|
||||||
FSAppAttempt app = iterator.next();
|
FSAppAttempt app = iterator.next().getKey();
|
||||||
if (app.isStopped() || !app.isStarved()) {
|
if (app.isStopped() || !app.isStarved()) {
|
||||||
// App does not need more resources
|
// App does not need more resources
|
||||||
Resources.subtractFrom(totalResourcesPreempted,
|
Resources.subtractFrom(totalResourcesPreempted,
|
||||||
|
|
Loading…
Reference in New Issue