MAPREDUCE-2908. Fix all findbugs warnings. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1166838 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ae5e8e0104
commit
1f46b991da
|
@ -1225,6 +1225,8 @@ Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-2948. Hadoop streaming test failure, post MR-2767 (mahadev)
|
MAPREDUCE-2948. Hadoop streaming test failure, post MR-2767 (mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-2908. Fix all findbugs warnings. (vinodkv via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -138,6 +138,11 @@
|
||||||
<Method name="run" />
|
<Method name="run" />
|
||||||
<Bug pattern="DM_EXIT" />
|
<Bug pattern="DM_EXIT" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal$DelegationTokenCancelThread" />
|
||||||
|
<Method name="run" />
|
||||||
|
<Bug pattern="DM_EXIT" />
|
||||||
|
</Match>
|
||||||
<!--
|
<!--
|
||||||
We need to cast objects between old and new api objects
|
We need to cast objects between old and new api objects
|
||||||
-->
|
-->
|
||||||
|
@ -155,7 +160,8 @@
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
|
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
|
||||||
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
|
<Method name="commitJob" />
|
||||||
|
<Bug pattern="NM_WRONG_PACKAGE" />
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.mapred.OutputCommitter" />
|
<Class name="org.apache.hadoop.mapred.OutputCommitter" />
|
||||||
|
@ -166,6 +172,14 @@
|
||||||
</Or>
|
</Or>
|
||||||
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
|
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.mapred.TaskCompletionEvent" />
|
||||||
|
<Or>
|
||||||
|
<Method name="setTaskStatus" />
|
||||||
|
<Method name="setTaskAttemptId" />
|
||||||
|
</Or>
|
||||||
|
<Bug pattern="NM_WRONG_PACKAGE" />
|
||||||
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
|
<Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
|
||||||
<Method name="next" />
|
<Method name="next" />
|
||||||
|
|
|
@ -157,6 +157,7 @@ public class MRApps extends Apps {
|
||||||
public static void setInitialClasspath(
|
public static void setInitialClasspath(
|
||||||
Map<String, String> environment) throws IOException {
|
Map<String, String> environment) throws IOException {
|
||||||
InputStream classpathFileStream = null;
|
InputStream classpathFileStream = null;
|
||||||
|
BufferedReader reader = null;
|
||||||
try {
|
try {
|
||||||
// Get yarn mapreduce-app classpath from generated classpath
|
// Get yarn mapreduce-app classpath from generated classpath
|
||||||
// Works if compile time env is same as runtime. Mainly tests.
|
// Works if compile time env is same as runtime. Mainly tests.
|
||||||
|
@ -165,8 +166,7 @@ public class MRApps extends Apps {
|
||||||
String mrAppGeneratedClasspathFile = "mrapp-generated-classpath";
|
String mrAppGeneratedClasspathFile = "mrapp-generated-classpath";
|
||||||
classpathFileStream =
|
classpathFileStream =
|
||||||
thisClassLoader.getResourceAsStream(mrAppGeneratedClasspathFile);
|
thisClassLoader.getResourceAsStream(mrAppGeneratedClasspathFile);
|
||||||
BufferedReader reader =
|
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
|
||||||
new BufferedReader(new InputStreamReader(classpathFileStream));
|
|
||||||
String cp = reader.readLine();
|
String cp = reader.readLine();
|
||||||
if (cp != null) {
|
if (cp != null) {
|
||||||
addToClassPath(environment, cp.trim());
|
addToClassPath(environment, cp.trim());
|
||||||
|
@ -198,6 +198,9 @@ public class MRApps extends Apps {
|
||||||
if (classpathFileStream != null) {
|
if (classpathFileStream != null) {
|
||||||
classpathFileStream.close();
|
classpathFileStream.close();
|
||||||
}
|
}
|
||||||
|
if (reader != null) {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// TODO: Remove duplicates.
|
// TODO: Remove duplicates.
|
||||||
}
|
}
|
||||||
|
|
|
@ -868,15 +868,6 @@ public class JobClient extends CLI {
|
||||||
Counters counters = Counters.downgrade(cntrs);
|
Counters counters = Counters.downgrade(cntrs);
|
||||||
return counters.findCounter(counterGroupName, counterName).getValue();
|
return counters.findCounter(counterGroupName, counterName).getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
void displayJobList(JobStatus[] jobs) {
|
|
||||||
System.out.printf("JobId\tState\tStartTime\tUserName\tQueue\tPriority\tSchedulingInfo\n");
|
|
||||||
for (JobStatus job : jobs) {
|
|
||||||
System.out.printf("%s\t%d\t%d\t%s\t%s\t%s\t%s\n", job.getJobID(), job.getRunState(),
|
|
||||||
job.getStartTime(), job.getUsername(), job.getQueue(),
|
|
||||||
job.getJobPriority().name(), job.getSchedulingInfo());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get status information about the max available Maps in the cluster.
|
* Get status information about the max available Maps in the cluster.
|
||||||
|
|
|
@ -97,7 +97,7 @@ public abstract class ResourceCalculatorPlugin extends Configured {
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public abstract ProcResourceValues getProcResourceValues();
|
public abstract ProcResourceValues getProcResourceValues();
|
||||||
|
|
||||||
public class ProcResourceValues {
|
public static class ProcResourceValues {
|
||||||
private final long cumulativeCpuTime;
|
private final long cumulativeCpuTime;
|
||||||
private final long physicalMemorySize;
|
private final long physicalMemorySize;
|
||||||
private final long virtualMemorySize;
|
private final long virtualMemorySize;
|
||||||
|
|
|
@ -149,8 +149,7 @@ class ClientServiceDelegate {
|
||||||
LOG.info("Connecting to " + serviceAddr);
|
LOG.info("Connecting to " + serviceAddr);
|
||||||
instantiateAMProxy(serviceAddr);
|
instantiateAMProxy(serviceAddr);
|
||||||
return realProxy;
|
return realProxy;
|
||||||
} catch (Exception e) {
|
} catch (IOException e) {
|
||||||
//possibly
|
|
||||||
//possibly the AM has crashed
|
//possibly the AM has crashed
|
||||||
//there may be some time before AM is restarted
|
//there may be some time before AM is restarted
|
||||||
//keep retrying by getting the address from RM
|
//keep retrying by getting the address from RM
|
||||||
|
@ -159,8 +158,13 @@ class ClientServiceDelegate {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
} catch (InterruptedException e1) {
|
} catch (InterruptedException e1) {
|
||||||
|
LOG.warn("getProxy() call interruped", e1);
|
||||||
|
throw new YarnException(e1);
|
||||||
}
|
}
|
||||||
application = rm.getApplicationReport(appId);
|
application = rm.getApplicationReport(appId);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("getProxy() call interruped", e);
|
||||||
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,7 +308,6 @@ class ClientServiceDelegate {
|
||||||
|
|
||||||
org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
|
org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
|
||||||
throws YarnRemoteException, YarnRemoteException {
|
throws YarnRemoteException, YarnRemoteException {
|
||||||
org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
|
|
||||||
GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
|
GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
|
||||||
|
|
||||||
List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
|
List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
|
||||||
|
|
|
@ -263,10 +263,6 @@ public class ShuffleHandler extends AbstractService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Shuffle createShuffle() {
|
|
||||||
return new Shuffle(getConfig());
|
|
||||||
}
|
|
||||||
|
|
||||||
class HttpPipelineFactory implements ChannelPipelineFactory {
|
class HttpPipelineFactory implements ChannelPipelineFactory {
|
||||||
|
|
||||||
final Shuffle SHUFFLE;
|
final Shuffle SHUFFLE;
|
||||||
|
@ -296,10 +292,12 @@ public class ShuffleHandler extends AbstractService
|
||||||
private final IndexCache indexCache;
|
private final IndexCache indexCache;
|
||||||
private final LocalDirAllocator lDirAlloc =
|
private final LocalDirAllocator lDirAlloc =
|
||||||
new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
|
new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
|
||||||
|
private final int port;
|
||||||
|
|
||||||
public Shuffle(Configuration conf) {
|
public Shuffle(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
indexCache = new IndexCache(new JobConf(conf));
|
indexCache = new IndexCache(new JobConf(conf));
|
||||||
|
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> splitMaps(List<String> mapq) {
|
private List<String> splitMaps(List<String> mapq) {
|
||||||
|
@ -362,7 +360,7 @@ public class ShuffleHandler extends AbstractService
|
||||||
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||||
try {
|
try {
|
||||||
verifyRequest(jobId, ctx, request, response,
|
verifyRequest(jobId, ctx, request, response,
|
||||||
new URL("http", "", port, reqUri));
|
new URL("http", "", this.port, reqUri));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Shuffle failure ", e);
|
LOG.warn("Shuffle failure ", e);
|
||||||
sendError(ctx, e.getMessage(), UNAUTHORIZED);
|
sendError(ctx, e.getMessage(), UNAUTHORIZED);
|
||||||
|
|
|
@ -65,6 +65,11 @@
|
||||||
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.rmnode\.RMNodeImpl.*" />
|
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.rmnode\.RMNodeImpl.*" />
|
||||||
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.RMAppManager.*" />
|
||||||
|
<Method name="handle" />
|
||||||
|
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||||
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.scheduler\.capacity\.CapacityScheduler.*" />
|
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.scheduler\.capacity\.CapacityScheduler.*" />
|
||||||
<Method name="handle" />
|
<Method name="handle" />
|
||||||
|
|
|
@ -97,7 +97,7 @@ public abstract class ResourceCalculatorPlugin extends Configured {
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public abstract ProcResourceValues getProcResourceValues();
|
public abstract ProcResourceValues getProcResourceValues();
|
||||||
|
|
||||||
public class ProcResourceValues {
|
public static class ProcResourceValues {
|
||||||
private final long cumulativeCpuTime;
|
private final long cumulativeCpuTime;
|
||||||
private final long physicalMemorySize;
|
private final long physicalMemorySize;
|
||||||
private final long virtualMemorySize;
|
private final long virtualMemorySize;
|
||||||
|
|
|
@ -67,7 +67,6 @@ public class WebApps {
|
||||||
boolean findPort = false;
|
boolean findPort = false;
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
boolean devMode = false;
|
boolean devMode = false;
|
||||||
Module[] modules;
|
|
||||||
|
|
||||||
Builder(String name, Class<T> api, T application) {
|
Builder(String name, Class<T> api, T application) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
@ -99,11 +98,6 @@ public class WebApps {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder<T> with(Module... modules) {
|
|
||||||
this.modules = modules; // OK
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder<T> inDevMode() {
|
public Builder<T> inDevMode() {
|
||||||
devMode = true;
|
devMode = true;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -311,7 +311,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
Container container = this.context.getContainers().get(containerID);
|
Container container = this.context.getContainers().get(containerID);
|
||||||
if (container == null) {
|
if (container == null) {
|
||||||
LOG.warn("Trying to stop unknown container " + containerID);
|
LOG.warn("Trying to stop unknown container " + containerID);
|
||||||
NMAuditLogger.logFailure(container.getUser(),
|
String userName;
|
||||||
|
try {
|
||||||
|
userName = UserGroupInformation.getCurrentUser().getUserName();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error finding userName", e);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
NMAuditLogger.logFailure(userName,
|
||||||
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
|
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
|
||||||
"Trying to stop unknown container!",
|
"Trying to stop unknown container!",
|
||||||
containerID.getAppId(), containerID);
|
containerID.getAppId(), containerID);
|
||||||
|
|
|
@ -18,32 +18,28 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class manages the list of applications for the resource manager.
|
* This class manages the list of applications for the resource manager.
|
||||||
|
@ -154,7 +150,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setCompletedAppsMax(int max) {
|
protected synchronized void setCompletedAppsMax(int max) {
|
||||||
this.completedAppsMax = max;
|
this.completedAppsMax = max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class RMConfig {
|
||||||
public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = "";
|
public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = "";
|
||||||
|
|
||||||
// the maximum number of completed applications RM keeps
|
// the maximum number of completed applications RM keeps
|
||||||
public static String EXPIRE_APPLICATIONS_COMPLETED_MAX =
|
public static final String EXPIRE_APPLICATIONS_COMPLETED_MAX =
|
||||||
YarnConfiguration.RM_PREFIX + "expire.applications.completed.max";
|
YarnConfiguration.RM_PREFIX + "expire.applications.completed.max";
|
||||||
public static final int DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX = 10000;
|
public static final int DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX = 10000;
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,10 +123,9 @@ public class ZKStore implements Store {
|
||||||
public synchronized void storeNode(RMNode node) throws IOException {
|
public synchronized void storeNode(RMNode node) throws IOException {
|
||||||
/** create a storage node and store it in zk **/
|
/** create a storage node and store it in zk **/
|
||||||
if (!doneWithRecovery) return;
|
if (!doneWithRecovery) return;
|
||||||
NodeReportPBImpl nodeManagerInfo = createNodeManagerInfo(node);
|
|
||||||
// TODO FinBugs - will be fixed after the subsequent fixme
|
|
||||||
byte[] bytes = nodeManagerInfo.getProto().toByteArray();
|
|
||||||
// TODO: FIXMEVinodkv
|
// TODO: FIXMEVinodkv
|
||||||
|
// NodeReportPBImpl nodeManagerInfo = createNodeManagerInfo(node);
|
||||||
|
// byte[] bytes = nodeManagerInfo.getProto().toByteArray();
|
||||||
// try {
|
// try {
|
||||||
// zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
|
// zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
|
||||||
// CreateMode.PERSISTENT);
|
// CreateMode.PERSISTENT);
|
||||||
|
@ -476,12 +475,12 @@ public class ZKStore implements Store {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int httpPort = Integer.valueOf(m.group(1));
|
int httpPort = Integer.valueOf(m.group(1));
|
||||||
// TODO: FindBugs Valid. Fix
|
// TODO: FindBugs warns passing null below. Commenting this for later.
|
||||||
RMNode nm = new RMNodeImpl(node.getNodeId(), null,
|
// RMNode nm = new RMNodeImpl(node.getNodeId(), null,
|
||||||
hostName, cmPort, httpPort,
|
// hostName, cmPort, httpPort,
|
||||||
ResourceTrackerService.resolve(node.getNodeId().getHost()),
|
// ResourceTrackerService.resolve(node.getNodeId().getHost()),
|
||||||
node.getCapability());
|
// node.getCapability());
|
||||||
nodeManagers.add(nm);
|
// nodeManagers.add(nm);
|
||||||
}
|
}
|
||||||
readLastNodeId();
|
readLastNodeId();
|
||||||
/* make sure we get all the applications */
|
/* make sure we get all the applications */
|
||||||
|
|
|
@ -278,10 +278,7 @@ public class SchedulerApp {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public void resetSchedulingOpportunities(Priority priority) {
|
synchronized public void resetSchedulingOpportunities(Priority priority) {
|
||||||
Integer schedulingOpportunities =
|
this.schedulingOpportunities.put(priority, Integer.valueOf(0));
|
||||||
this.schedulingOpportunities.get(priority);
|
|
||||||
schedulingOpportunities = 0;
|
|
||||||
this.schedulingOpportunities.put(priority, schedulingOpportunities);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public void addSchedulingOpportunity(Priority priority) {
|
synchronized public void addSchedulingOpportunity(Priority priority) {
|
||||||
|
@ -305,9 +302,7 @@ public class SchedulerApp {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void resetReReservations(Priority priority) {
|
synchronized void resetReReservations(Priority priority) {
|
||||||
Integer reReservations = this.reReservations.get(priority);
|
this.reReservations.put(priority, Integer.valueOf(0));
|
||||||
reReservations = 0;
|
|
||||||
this.reReservations.put(priority, reReservations);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void addReReservation(Priority priority) {
|
synchronized void addReReservation(Priority priority) {
|
||||||
|
|
|
@ -35,18 +35,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue represents a node in the tree of
|
* <code>CSQueue</code> represents a node in the tree of
|
||||||
* hierarchical queues in the {@link CapacityScheduler}.
|
* hierarchical queues in the {@link CapacityScheduler}.
|
||||||
*/
|
*/
|
||||||
@Stable
|
@Stable
|
||||||
@Private
|
@Private
|
||||||
public interface Queue
|
public interface CSQueue
|
||||||
extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
||||||
/**
|
/**
|
||||||
* Get the parent <code>Queue</code>.
|
* Get the parent <code>Queue</code>.
|
||||||
* @return the parent queue
|
* @return the parent queue
|
||||||
*/
|
*/
|
||||||
public Queue getParent();
|
public CSQueue getParent();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the queue name.
|
* Get the queue name.
|
||||||
|
@ -122,7 +122,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
||||||
* Get child queues
|
* Get child queues
|
||||||
* @return child queues
|
* @return child queues
|
||||||
*/
|
*/
|
||||||
public List<Queue> getChildQueues();
|
public List<CSQueue> getChildQueues();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the <code>user</code> has permission to perform the operation
|
* Check if the <code>user</code> has permission to perform the operation
|
||||||
|
@ -183,7 +183,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
||||||
* @param queue new queue to re-initalize from
|
* @param queue new queue to re-initalize from
|
||||||
* @param clusterResource resources in the cluster
|
* @param clusterResource resources in the cluster
|
||||||
*/
|
*/
|
||||||
public void reinitialize(Queue queue, Resource clusterResource)
|
public void reinitialize(CSQueue queue, Resource clusterResource)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
|
@ -80,14 +80,14 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
||||||
|
|
||||||
private Queue root;
|
private CSQueue root;
|
||||||
|
|
||||||
private final static List<Container> EMPTY_CONTAINER_LIST =
|
private final static List<Container> EMPTY_CONTAINER_LIST =
|
||||||
new ArrayList<Container>();
|
new ArrayList<Container>();
|
||||||
|
|
||||||
static final Comparator<Queue> queueComparator = new Comparator<Queue>() {
|
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(Queue q1, Queue q2) {
|
public int compare(CSQueue q1, CSQueue q2) {
|
||||||
if (q1.getUtilization() < q2.getUtilization()) {
|
if (q1.getUtilization() < q2.getUtilization()) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (q1.getUtilization() > q2.getUtilization()) {
|
} else if (q1.getUtilization() > q2.getUtilization()) {
|
||||||
|
@ -110,7 +110,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
private ContainerTokenSecretManager containerTokenSecretManager;
|
private ContainerTokenSecretManager containerTokenSecretManager;
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
|
|
||||||
private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
|
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
|
||||||
|
|
||||||
private Map<NodeId, SchedulerNode> nodes =
|
private Map<NodeId, SchedulerNode> nodes =
|
||||||
new ConcurrentHashMap<NodeId, SchedulerNode>();
|
new ConcurrentHashMap<NodeId, SchedulerNode>();
|
||||||
|
@ -127,7 +127,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
|
|
||||||
private boolean initialized = false;
|
private boolean initialized = false;
|
||||||
|
|
||||||
public Queue getRootQueue() {
|
public CSQueue getRootQueue() {
|
||||||
return root;
|
return root;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,7 +207,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
CapacitySchedulerConfiguration.PREFIX + ROOT;
|
CapacitySchedulerConfiguration.PREFIX + ROOT;
|
||||||
|
|
||||||
static class QueueHook {
|
static class QueueHook {
|
||||||
public Queue hook(Queue queue) {
|
public CSQueue hook(CSQueue queue) {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -225,8 +225,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
private void reinitializeQueues(CapacitySchedulerConfiguration conf)
|
private void reinitializeQueues(CapacitySchedulerConfiguration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Parse new queues
|
// Parse new queues
|
||||||
Map<String, Queue> newQueues = new HashMap<String, Queue>();
|
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
||||||
Queue newRoot =
|
CSQueue newRoot =
|
||||||
parseQueue(this, conf, null, ROOT, newQueues, queues,
|
parseQueue(this, conf, null, ROOT, newQueues, queues,
|
||||||
queueComparator, applicationComparator, noop);
|
queueComparator, applicationComparator, noop);
|
||||||
|
|
||||||
|
@ -247,7 +247,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
*/
|
*/
|
||||||
@Lock(CapacityScheduler.class)
|
@Lock(CapacityScheduler.class)
|
||||||
private void validateExistingQueues(
|
private void validateExistingQueues(
|
||||||
Map<String, Queue> queues, Map<String, Queue> newQueues)
|
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (String queue : queues.keySet()) {
|
for (String queue : queues.keySet()) {
|
||||||
if (!newQueues.containsKey(queue)) {
|
if (!newQueues.containsKey(queue)) {
|
||||||
|
@ -264,11 +264,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
*/
|
*/
|
||||||
@Lock(CapacityScheduler.class)
|
@Lock(CapacityScheduler.class)
|
||||||
private void addNewQueues(
|
private void addNewQueues(
|
||||||
Map<String, Queue> queues, Map<String, Queue> newQueues)
|
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
||||||
{
|
{
|
||||||
for (Map.Entry<String, Queue> e : newQueues.entrySet()) {
|
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
|
||||||
String queueName = e.getKey();
|
String queueName = e.getKey();
|
||||||
Queue queue = e.getValue();
|
CSQueue queue = e.getValue();
|
||||||
if (!queues.containsKey(queueName)) {
|
if (!queues.containsKey(queueName)) {
|
||||||
queues.put(queueName, queue);
|
queues.put(queueName, queue);
|
||||||
}
|
}
|
||||||
|
@ -276,15 +276,15 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(CapacityScheduler.class)
|
@Lock(CapacityScheduler.class)
|
||||||
static Queue parseQueue(
|
static CSQueue parseQueue(
|
||||||
CapacitySchedulerContext csContext,
|
CapacitySchedulerContext csContext,
|
||||||
CapacitySchedulerConfiguration conf,
|
CapacitySchedulerConfiguration conf,
|
||||||
Queue parent, String queueName, Map<String, Queue> queues,
|
CSQueue parent, String queueName, Map<String, CSQueue> queues,
|
||||||
Map<String, Queue> oldQueues,
|
Map<String, CSQueue> oldQueues,
|
||||||
Comparator<Queue> queueComparator,
|
Comparator<CSQueue> queueComparator,
|
||||||
Comparator<SchedulerApp> applicationComparator,
|
Comparator<SchedulerApp> applicationComparator,
|
||||||
QueueHook hook) {
|
QueueHook hook) {
|
||||||
Queue queue;
|
CSQueue queue;
|
||||||
String[] childQueueNames =
|
String[] childQueueNames =
|
||||||
conf.getQueues((parent == null) ?
|
conf.getQueues((parent == null) ?
|
||||||
queueName : (parent.getQueuePath()+"."+queueName));
|
queueName : (parent.getQueuePath()+"."+queueName));
|
||||||
|
@ -306,9 +306,9 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
// Used only for unit tests
|
// Used only for unit tests
|
||||||
queue = hook.hook(parentQueue);
|
queue = hook.hook(parentQueue);
|
||||||
|
|
||||||
List<Queue> childQueues = new ArrayList<Queue>();
|
List<CSQueue> childQueues = new ArrayList<CSQueue>();
|
||||||
for (String childQueueName : childQueueNames) {
|
for (String childQueueName : childQueueNames) {
|
||||||
Queue childQueue =
|
CSQueue childQueue =
|
||||||
parseQueue(csContext, conf, queue, childQueueName,
|
parseQueue(csContext, conf, queue, childQueueName,
|
||||||
queues, oldQueues, queueComparator, applicationComparator, hook);
|
queues, oldQueues, queueComparator, applicationComparator, hook);
|
||||||
childQueues.add(childQueue);
|
childQueues.add(childQueue);
|
||||||
|
@ -322,7 +322,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized Queue getQueue(String queueName) {
|
synchronized CSQueue getQueue(String queueName) {
|
||||||
return queues.get(queueName);
|
return queues.get(queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
String queueName, String user) {
|
String queueName, String user) {
|
||||||
|
|
||||||
// Sanity checks
|
// Sanity checks
|
||||||
Queue queue = getQueue(queueName);
|
CSQueue queue = getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
String message = "Application " + applicationAttemptId +
|
String message = "Application " + applicationAttemptId +
|
||||||
" submitted by user " + user + " to unknown queue: " + queueName;
|
" submitted by user " + user + " to unknown queue: " + queueName;
|
||||||
|
@ -405,7 +405,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
|
|
||||||
// Inform the queue
|
// Inform the queue
|
||||||
String queueName = application.getQueue().getQueueName();
|
String queueName = application.getQueue().getQueueName();
|
||||||
Queue queue = queues.get(queueName);
|
CSQueue queue = queues.get(queueName);
|
||||||
if (!(queue instanceof LeafQueue)) {
|
if (!(queue instanceof LeafQueue)) {
|
||||||
LOG.error("Cannot finish application " + "from non-leaf queue: "
|
LOG.error("Cannot finish application " + "from non-leaf queue: "
|
||||||
+ queueName);
|
+ queueName);
|
||||||
|
@ -479,7 +479,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
public QueueInfo getQueueInfo(String queueName,
|
public QueueInfo getQueueInfo(String queueName,
|
||||||
boolean includeChildQueues, boolean recursive)
|
boolean includeChildQueues, boolean recursive)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Queue queue = null;
|
CSQueue queue = null;
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
queue = this.queues.get(queueName);
|
queue = this.queues.get(queueName);
|
||||||
|
|
|
@ -64,11 +64,11 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class LeafQueue implements Queue {
|
public class LeafQueue implements CSQueue {
|
||||||
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
|
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
|
||||||
|
|
||||||
private final String queueName;
|
private final String queueName;
|
||||||
private Queue parent;
|
private CSQueue parent;
|
||||||
private float capacity;
|
private float capacity;
|
||||||
private float absoluteCapacity;
|
private float absoluteCapacity;
|
||||||
private float maximumCapacity;
|
private float maximumCapacity;
|
||||||
|
@ -119,8 +119,8 @@ public class LeafQueue implements Queue {
|
||||||
final static int DEFAULT_AM_RESOURCE = 2 * 1024;
|
final static int DEFAULT_AM_RESOURCE = 2 * 1024;
|
||||||
|
|
||||||
public LeafQueue(CapacitySchedulerContext cs,
|
public LeafQueue(CapacitySchedulerContext cs,
|
||||||
String queueName, Queue parent,
|
String queueName, CSQueue parent,
|
||||||
Comparator<SchedulerApp> applicationComparator, Queue old) {
|
Comparator<SchedulerApp> applicationComparator, CSQueue old) {
|
||||||
this.scheduler = cs;
|
this.scheduler = cs;
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
|
@ -192,7 +192,7 @@ public class LeafQueue implements Queue {
|
||||||
float maxAMResourcePercent, float absoluteCapacity) {
|
float maxAMResourcePercent, float absoluteCapacity) {
|
||||||
return
|
return
|
||||||
Math.max(
|
Math.max(
|
||||||
(int)((clusterResource.getMemory() / DEFAULT_AM_RESOURCE) *
|
(int)((clusterResource.getMemory() / (float)DEFAULT_AM_RESOURCE) *
|
||||||
maxAMResourcePercent * absoluteCapacity),
|
maxAMResourcePercent * absoluteCapacity),
|
||||||
1);
|
1);
|
||||||
}
|
}
|
||||||
|
@ -271,7 +271,7 @@ public class LeafQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Queue getParent() {
|
public CSQueue getParent() {
|
||||||
return parent;
|
return parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,15 +313,15 @@ public class LeafQueue implements Queue {
|
||||||
return maxApplications;
|
return maxApplications;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxApplicationsPerUser() {
|
public synchronized int getMaxApplicationsPerUser() {
|
||||||
return maxApplicationsPerUser;
|
return maxApplicationsPerUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaximumActiveApplications() {
|
public synchronized int getMaximumActiveApplications() {
|
||||||
return maxActiveApplications;
|
return maxActiveApplications;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaximumActiveApplicationsPerUser() {
|
public synchronized int getMaximumActiveApplicationsPerUser() {
|
||||||
return maxActiveApplicationsPerUser;
|
return maxActiveApplicationsPerUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,7 +341,7 @@ public class LeafQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Queue> getChildQueues() {
|
public List<CSQueue> getChildQueues() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,7 +381,7 @@ public class LeafQueue implements Queue {
|
||||||
this.userLimitFactor = userLimitFactor;
|
this.userLimitFactor = userLimitFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void setParentQueue(Queue parent) {
|
synchronized void setParentQueue(CSQueue parent) {
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,12 +423,12 @@ public class LeafQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public int getUserLimit() {
|
public synchronized int getUserLimit() {
|
||||||
return userLimit;
|
return userLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public float getUserLimitFactor() {
|
public synchronized float getUserLimitFactor() {
|
||||||
return userLimitFactor;
|
return userLimitFactor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,7 +480,7 @@ public class LeafQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reinitialize(Queue queue, Resource clusterResource)
|
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Sanity check
|
// Sanity check
|
||||||
if (!(queue instanceof LeafQueue) ||
|
if (!(queue instanceof LeafQueue) ||
|
||||||
|
@ -493,9 +493,10 @@ public class LeafQueue implements Queue {
|
||||||
setupQueueConfigs(leafQueue.capacity, leafQueue.absoluteCapacity,
|
setupQueueConfigs(leafQueue.capacity, leafQueue.absoluteCapacity,
|
||||||
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
|
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
|
||||||
leafQueue.userLimit, leafQueue.userLimitFactor,
|
leafQueue.userLimit, leafQueue.userLimitFactor,
|
||||||
leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
|
leafQueue.maxApplications,
|
||||||
leafQueue.maxActiveApplications,
|
leafQueue.getMaxApplicationsPerUser(),
|
||||||
leafQueue.maxActiveApplicationsPerUser,
|
leafQueue.getMaximumActiveApplications(),
|
||||||
|
leafQueue.getMaximumActiveApplicationsPerUser(),
|
||||||
leafQueue.state, leafQueue.acls);
|
leafQueue.state, leafQueue.acls);
|
||||||
|
|
||||||
updateResource(clusterResource);
|
updateResource(clusterResource);
|
||||||
|
@ -900,7 +901,7 @@ public class LeafQueue implements Queue {
|
||||||
// Protect against corner case where you need the whole node with
|
// Protect against corner case where you need the whole node with
|
||||||
// Math.min(nodeFactor, minimumAllocationFactor)
|
// Math.min(nodeFactor, minimumAllocationFactor)
|
||||||
starvation =
|
starvation =
|
||||||
(int)((application.getReReservations(priority) / reservedContainers) *
|
(int)((application.getReReservations(priority) / (float)reservedContainers) *
|
||||||
(1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor())))
|
(1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor())))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -53,11 +53,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Evolving
|
@Evolving
|
||||||
public class ParentQueue implements Queue {
|
public class ParentQueue implements CSQueue {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
|
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
|
||||||
|
|
||||||
private final Queue parent;
|
private final CSQueue parent;
|
||||||
private final String queueName;
|
private final String queueName;
|
||||||
|
|
||||||
private float capacity;
|
private float capacity;
|
||||||
|
@ -68,8 +68,8 @@ public class ParentQueue implements Queue {
|
||||||
private float usedCapacity = 0.0f;
|
private float usedCapacity = 0.0f;
|
||||||
private float utilization = 0.0f;
|
private float utilization = 0.0f;
|
||||||
|
|
||||||
private final Set<Queue> childQueues;
|
private final Set<CSQueue> childQueues;
|
||||||
private final Comparator<Queue> queueComparator;
|
private final Comparator<CSQueue> queueComparator;
|
||||||
|
|
||||||
private Resource usedResources =
|
private Resource usedResources =
|
||||||
Resources.createResource(0);
|
Resources.createResource(0);
|
||||||
|
@ -94,7 +94,7 @@ public class ParentQueue implements Queue {
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
public ParentQueue(CapacitySchedulerContext cs,
|
public ParentQueue(CapacitySchedulerContext cs,
|
||||||
String queueName, Comparator<Queue> comparator, Queue parent, Queue old) {
|
String queueName, Comparator<CSQueue> comparator, CSQueue parent, CSQueue old) {
|
||||||
minimumAllocation = cs.getMinimumResourceCapability();
|
minimumAllocation = cs.getMinimumResourceCapability();
|
||||||
|
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
|
@ -140,7 +140,7 @@ public class ParentQueue implements Queue {
|
||||||
maximumCapacity, absoluteMaxCapacity, state, acls);
|
maximumCapacity, absoluteMaxCapacity, state, acls);
|
||||||
|
|
||||||
this.queueComparator = comparator;
|
this.queueComparator = comparator;
|
||||||
this.childQueues = new TreeSet<Queue>(comparator);
|
this.childQueues = new TreeSet<CSQueue>(comparator);
|
||||||
|
|
||||||
LOG.info("Initialized parent-queue " + queueName +
|
LOG.info("Initialized parent-queue " + queueName +
|
||||||
" name=" + queueName +
|
" name=" + queueName +
|
||||||
|
@ -180,11 +180,11 @@ public class ParentQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static float PRECISION = 0.005f; // 0.05% precision
|
private static float PRECISION = 0.005f; // 0.05% precision
|
||||||
void setChildQueues(Collection<Queue> childQueues) {
|
void setChildQueues(Collection<CSQueue> childQueues) {
|
||||||
|
|
||||||
// Validate
|
// Validate
|
||||||
float childCapacities = 0;
|
float childCapacities = 0;
|
||||||
for (Queue queue : childQueues) {
|
for (CSQueue queue : childQueues) {
|
||||||
childCapacities += queue.getCapacity();
|
childCapacities += queue.getCapacity();
|
||||||
}
|
}
|
||||||
float delta = Math.abs(1.0f - childCapacities); // crude way to check
|
float delta = Math.abs(1.0f - childCapacities); // crude way to check
|
||||||
|
@ -200,7 +200,7 @@ public class ParentQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Queue getParent() {
|
public CSQueue getParent() {
|
||||||
return parent;
|
return parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,8 +251,8 @@ public class ParentQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<Queue> getChildQueues() {
|
public synchronized List<CSQueue> getChildQueues() {
|
||||||
return new ArrayList<Queue>(childQueues);
|
return new ArrayList<CSQueue>(childQueues);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int getNumContainers() {
|
public synchronized int getNumContainers() {
|
||||||
|
@ -280,7 +280,7 @@ public class ParentQueue implements Queue {
|
||||||
|
|
||||||
List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
|
List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
|
||||||
if (includeChildQueues) {
|
if (includeChildQueues) {
|
||||||
for (Queue child : childQueues) {
|
for (CSQueue child : childQueues) {
|
||||||
// Get queue information recursively?
|
// Get queue information recursively?
|
||||||
childQueuesInfo.add(
|
childQueuesInfo.add(
|
||||||
child.getQueueInfo(recursive, recursive));
|
child.getQueueInfo(recursive, recursive));
|
||||||
|
@ -319,7 +319,7 @@ public class ParentQueue implements Queue {
|
||||||
userAcls.add(getUserAclInfo(user));
|
userAcls.add(getUserAclInfo(user));
|
||||||
|
|
||||||
// Add children queue acls
|
// Add children queue acls
|
||||||
for (Queue child : childQueues) {
|
for (CSQueue child : childQueues) {
|
||||||
userAcls.addAll(child.getQueueUserAclInfo(user));
|
userAcls.addAll(child.getQueueUserAclInfo(user));
|
||||||
}
|
}
|
||||||
return userAcls;
|
return userAcls;
|
||||||
|
@ -333,7 +333,7 @@ public class ParentQueue implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reinitialize(Queue queue, Resource clusterResource)
|
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Sanity check
|
// Sanity check
|
||||||
if (!(queue instanceof ParentQueue) ||
|
if (!(queue instanceof ParentQueue) ||
|
||||||
|
@ -346,13 +346,13 @@ public class ParentQueue implements Queue {
|
||||||
|
|
||||||
// Re-configure existing child queues and add new ones
|
// Re-configure existing child queues and add new ones
|
||||||
// The CS has already checked to ensure all existing child queues are present!
|
// The CS has already checked to ensure all existing child queues are present!
|
||||||
Map<String, Queue> currentChildQueues = getQueues(childQueues);
|
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
|
||||||
Map<String, Queue> newChildQueues = getQueues(parentQueue.childQueues);
|
Map<String, CSQueue> newChildQueues = getQueues(parentQueue.childQueues);
|
||||||
for (Map.Entry<String, Queue> e : newChildQueues.entrySet()) {
|
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
|
||||||
String newChildQueueName = e.getKey();
|
String newChildQueueName = e.getKey();
|
||||||
Queue newChildQueue = e.getValue();
|
CSQueue newChildQueue = e.getValue();
|
||||||
|
|
||||||
Queue childQueue = currentChildQueues.get(newChildQueueName);
|
CSQueue childQueue = currentChildQueues.get(newChildQueueName);
|
||||||
if (childQueue != null){
|
if (childQueue != null){
|
||||||
childQueue.reinitialize(newChildQueue, clusterResource);
|
childQueue.reinitialize(newChildQueue, clusterResource);
|
||||||
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
|
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
|
||||||
|
@ -375,9 +375,9 @@ public class ParentQueue implements Queue {
|
||||||
updateResource(clusterResource);
|
updateResource(clusterResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Queue> getQueues(Set<Queue> queues) {
|
Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
|
||||||
Map<String, Queue> queuesMap = new HashMap<String, Queue>();
|
Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>();
|
||||||
for (Queue queue : queues) {
|
for (CSQueue queue : queues) {
|
||||||
queuesMap.put(queue.getQueueName(), queue);
|
queuesMap.put(queue.getQueueName(), queue);
|
||||||
}
|
}
|
||||||
return queuesMap;
|
return queuesMap;
|
||||||
|
@ -568,8 +568,8 @@ public class ParentQueue implements Queue {
|
||||||
printChildQueues();
|
printChildQueues();
|
||||||
|
|
||||||
// Try to assign to most 'under-served' sub-queue
|
// Try to assign to most 'under-served' sub-queue
|
||||||
for (Iterator<Queue> iter=childQueues.iterator(); iter.hasNext();) {
|
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
|
||||||
Queue childQueue = iter.next();
|
CSQueue childQueue = iter.next();
|
||||||
LOG.info("DEBUG --- Trying to assign to" +
|
LOG.info("DEBUG --- Trying to assign to" +
|
||||||
" queue: " + childQueue.getQueuePath() +
|
" queue: " + childQueue.getQueuePath() +
|
||||||
" stats: " + childQueue);
|
" stats: " + childQueue);
|
||||||
|
@ -595,7 +595,7 @@ public class ParentQueue implements Queue {
|
||||||
|
|
||||||
String getChildQueuesToPrint() {
|
String getChildQueuesToPrint() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (Queue q : childQueues) {
|
for (CSQueue q : childQueues) {
|
||||||
sb.append(q.getQueuePath() + "(" + q.getUtilization() + "), ");
|
sb.append(q.getQueuePath() + "(" + q.getUtilization() + "), ");
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
|
@ -648,7 +648,7 @@ public class ParentQueue implements Queue {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateClusterResource(Resource clusterResource) {
|
public synchronized void updateClusterResource(Resource clusterResource) {
|
||||||
// Update all children
|
// Update all children
|
||||||
for (Queue childQueue : childQueues) {
|
for (CSQueue childQueue : childQueues) {
|
||||||
childQueue.updateClusterResource(clusterResource);
|
childQueue.updateClusterResource(clusterResource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ import com.google.inject.servlet.RequestScoped;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.webapp.SubView;
|
import org.apache.hadoop.yarn.webapp.SubView;
|
||||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
|
||||||
|
@ -43,7 +43,7 @@ class CapacitySchedulerPage extends RmView {
|
||||||
|
|
||||||
@RequestScoped
|
@RequestScoped
|
||||||
static class Parent {
|
static class Parent {
|
||||||
Queue queue;
|
CSQueue queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class QueueBlock extends HtmlBlock {
|
public static class QueueBlock extends HtmlBlock {
|
||||||
|
@ -56,8 +56,8 @@ class CapacitySchedulerPage extends RmView {
|
||||||
@Override
|
@Override
|
||||||
public void render(Block html) {
|
public void render(Block html) {
|
||||||
UL<Hamlet> ul = html.ul();
|
UL<Hamlet> ul = html.ul();
|
||||||
Queue parentQueue = parent.queue;
|
CSQueue parentQueue = parent.queue;
|
||||||
for (Queue queue : parentQueue.getChildQueues()) {
|
for (CSQueue queue : parentQueue.getChildQueues()) {
|
||||||
float used = queue.getUsedCapacity();
|
float used = queue.getUsedCapacity();
|
||||||
float set = queue.getCapacity();
|
float set = queue.getCapacity();
|
||||||
float delta = Math.abs(set - used) + 0.001f;
|
float delta = Math.abs(set - used) + 0.001f;
|
||||||
|
@ -109,7 +109,7 @@ class CapacitySchedulerPage extends RmView {
|
||||||
span().$style(Q_END)._("100% ")._().
|
span().$style(Q_END)._("100% ")._().
|
||||||
span(".q", "default")._()._();
|
span(".q", "default")._()._();
|
||||||
} else {
|
} else {
|
||||||
Queue root = cs.getRootQueue();
|
CSQueue root = cs.getRootQueue();
|
||||||
parent.queue = root;
|
parent.queue = root;
|
||||||
float used = root.getUsedCapacity();
|
float used = root.getUsedCapacity();
|
||||||
float set = root.getCapacity();
|
float set = root.getCapacity();
|
||||||
|
|
|
@ -38,8 +38,8 @@ public class TestApplicationLimits {
|
||||||
when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
|
when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
|
||||||
when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB));
|
when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB));
|
||||||
|
|
||||||
Map<String, Queue> queues = new HashMap<String, Queue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
Queue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
||||||
queues, queues,
|
queues, queues,
|
||||||
CapacityScheduler.queueComparator,
|
CapacityScheduler.queueComparator,
|
||||||
|
@ -108,8 +108,8 @@ public class TestApplicationLimits {
|
||||||
Resource clusterResource = Resources.createResource(100 * 16 * GB);
|
Resource clusterResource = Resources.createResource(100 * 16 * GB);
|
||||||
when(csContext.getClusterResources()).thenReturn(clusterResource);
|
when(csContext.getClusterResources()).thenReturn(clusterResource);
|
||||||
|
|
||||||
Map<String, Queue> queues = new HashMap<String, Queue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
Queue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
||||||
queues, queues,
|
queues, queues,
|
||||||
CapacityScheduler.queueComparator,
|
CapacityScheduler.queueComparator,
|
||||||
|
|
|
@ -65,8 +65,8 @@ public class TestLeafQueue {
|
||||||
CapacitySchedulerConfiguration csConf;
|
CapacitySchedulerConfiguration csConf;
|
||||||
CapacitySchedulerContext csContext;
|
CapacitySchedulerContext csContext;
|
||||||
|
|
||||||
Queue root;
|
CSQueue root;
|
||||||
Map<String, Queue> queues = new HashMap<String, Queue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
|
|
||||||
final static int GB = 1024;
|
final static int GB = 1024;
|
||||||
final static String DEFAULT_RACK = "/default";
|
final static String DEFAULT_RACK = "/default";
|
||||||
|
@ -145,7 +145,7 @@ public class TestLeafQueue {
|
||||||
any(Resource.class));
|
any(Resource.class));
|
||||||
|
|
||||||
// 2. Stub out LeafQueue.parent.completedContainer
|
// 2. Stub out LeafQueue.parent.completedContainer
|
||||||
Queue parent = queue.getParent();
|
CSQueue parent = queue.getParent();
|
||||||
doNothing().when(parent).completedContainer(
|
doNothing().when(parent).completedContainer(
|
||||||
any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
|
any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
|
||||||
any(RMContainer.class), any(RMContainerEventType.class));
|
any(RMContainer.class), any(RMContainerEventType.class));
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class TestParentQueue {
|
||||||
LOG.info("Setup top-level queues a and b");
|
LOG.info("Setup top-level queues a and b");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stubQueueAllocation(final Queue queue,
|
private void stubQueueAllocation(final CSQueue queue,
|
||||||
final Resource clusterResource, final SchedulerNode node,
|
final Resource clusterResource, final SchedulerNode node,
|
||||||
final int allocation) {
|
final int allocation) {
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ public class TestParentQueue {
|
||||||
when(queue).assignContainers(eq(clusterResource), eq(node));
|
when(queue).assignContainers(eq(clusterResource), eq(node));
|
||||||
}
|
}
|
||||||
|
|
||||||
private float computeQueueUtilization(Queue queue,
|
private float computeQueueUtilization(CSQueue queue,
|
||||||
int expectedMemory, Resource clusterResource) {
|
int expectedMemory, Resource clusterResource) {
|
||||||
return (expectedMemory /
|
return (expectedMemory /
|
||||||
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
|
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
|
||||||
|
@ -132,8 +132,8 @@ public class TestParentQueue {
|
||||||
// Setup queue configs
|
// Setup queue configs
|
||||||
setupSingleLevelQueues(csConf);
|
setupSingleLevelQueues(csConf);
|
||||||
|
|
||||||
Map<String, Queue> queues = new HashMap<String, Queue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
Queue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||||
CapacityScheduler.ROOT, queues, queues,
|
CapacityScheduler.ROOT, queues, queues,
|
||||||
CapacityScheduler.queueComparator,
|
CapacityScheduler.queueComparator,
|
||||||
|
@ -270,8 +270,8 @@ public class TestParentQueue {
|
||||||
// Setup queue configs
|
// Setup queue configs
|
||||||
setupMultiLevelQueues(csConf);
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
Map<String, Queue> queues = new HashMap<String, Queue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
Queue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||||
CapacityScheduler.ROOT, queues, queues,
|
CapacityScheduler.ROOT, queues, queues,
|
||||||
CapacityScheduler.queueComparator,
|
CapacityScheduler.queueComparator,
|
||||||
|
@ -294,17 +294,17 @@ public class TestParentQueue {
|
||||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
// Start testing
|
// Start testing
|
||||||
Queue a = queues.get(A);
|
CSQueue a = queues.get(A);
|
||||||
Queue b = queues.get(B);
|
CSQueue b = queues.get(B);
|
||||||
Queue c = queues.get(C);
|
CSQueue c = queues.get(C);
|
||||||
Queue d = queues.get(D);
|
CSQueue d = queues.get(D);
|
||||||
|
|
||||||
Queue a1 = queues.get(A1);
|
CSQueue a1 = queues.get(A1);
|
||||||
Queue a2 = queues.get(A2);
|
CSQueue a2 = queues.get(A2);
|
||||||
|
|
||||||
Queue b1 = queues.get(B1);
|
CSQueue b1 = queues.get(B1);
|
||||||
Queue b2 = queues.get(B2);
|
CSQueue b2 = queues.get(B2);
|
||||||
Queue b3 = queues.get(B3);
|
CSQueue b3 = queues.get(B3);
|
||||||
|
|
||||||
final float delta = 0.0001f;
|
final float delta = 0.0001f;
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class TestUtils {
|
||||||
*/
|
*/
|
||||||
static class SpyHook extends CapacityScheduler.QueueHook {
|
static class SpyHook extends CapacityScheduler.QueueHook {
|
||||||
@Override
|
@Override
|
||||||
public Queue hook(Queue queue) {
|
public CSQueue hook(CSQueue queue) {
|
||||||
return spy(queue);
|
return spy(queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -388,9 +388,4 @@
|
||||||
<Field name="started" />
|
<Field name="started" />
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
</FindBugsFilter>
|
||||||
<Class name="org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal$DelegationTokenCancelThread" />
|
|
||||||
<Method name="run" />
|
|
||||||
<Bug pattern="DM_EXIT" />
|
|
||||||
</Match>
|
|
||||||
</FindBugsFilter>
|
|
||||||
|
|
Loading…
Reference in New Issue