diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index c50e9a3cc06..c7265ea2dd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -20,15 +20,19 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -41,15 +45,52 @@ class DFSClientCache { private static final Log LOG = LogFactory.getLog(DFSClientCache.class); /** - * Cache that maps User id to corresponding DFSClient. + * Cache that maps User id to the corresponding DFSClient. */ @VisibleForTesting final LoadingCache clientCache; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; + /** + * Cache that maps to the corresponding + * FSDataInputStream. + */ + final LoadingCache inputstreamCache; + + /** + * Time to live for a DFSClient (in seconds) + */ + final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024; + final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60; + private final Configuration config; + private static class DFSInputStreamCaheKey { + final String userId; + final String inodePath; + + private DFSInputStreamCaheKey(String userId, String inodePath) { + super(); + this.userId = userId; + this.inodePath = inodePath; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DFSInputStreamCaheKey) { + DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj; + return userId.equals(k.userId) && inodePath.equals(k.inodePath); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(userId, inodePath); + } + } + DFSClientCache(Configuration config) { this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); } @@ -60,6 +101,12 @@ class DFSClientCache { .maximumSize(clientCache) .removalListener(clientRemovealListener()) .build(clientLoader()); + + this.inputstreamCache = CacheBuilder.newBuilder() + .maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE) + .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) + .removalListener(inputStreamRemovalListener()) + .build(inputStreamLoader()); } private CacheLoader clientLoader() { @@ -95,7 +142,33 @@ public void onRemoval(RemovalNotification notification) { }; } - DFSClient get(String userName) { + private RemovalListener inputStreamRemovalListener() { + return new RemovalListener() { + + @Override + public void onRemoval( + RemovalNotification notification) { + try { + notification.getValue().close(); + } catch (IOException e) { + } + } + }; + } + + private CacheLoader inputStreamLoader() { + return new CacheLoader() { + + @Override + public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { + DFSClient client = getDfsClient(key.userId); + DFSInputStream dis = client.open(key.inodePath); + return new FSDataInputStream(dis); + } + }; + } + + DFSClient getDfsClient(String userName) { DFSClient client = null; try { client = clientCache.get(userName); @@ -105,4 +178,21 @@ DFSClient get(String userName) { } return client; } + + FSDataInputStream getDfsInputStream(String userName, String inodePath) { + DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + FSDataInputStream s = null; + try { + s = inputstreamCache.get(k); + } catch (ExecutionException e) { + LOG.warn("Failed to create DFSInputStream for user:" + userName + + " Cause:" + e); + } + return s; + } + + public void invalidateDfsInputStream(String userName, String inodePath) { + DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + inputstreamCache.invalidate(k); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 554f702dfd6..612171e084f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -235,7 +234,7 @@ public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -310,7 +309,7 @@ private void setattrInternal(DFSClient dfsClient, String fileIdPath, public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, InetAddress client) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -392,7 +391,7 @@ public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -454,7 +453,7 @@ public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -502,7 +501,7 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -563,13 +562,14 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, public READ3Response read(XDR xdr, SecurityHandler securityHandler, InetAddress client) { READ3Response response = new READ3Response(Nfs3Status.NFS3_OK); + final String userName = securityHandler.getUser(); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { response.setStatus(Nfs3Status.NFS3ERR_ACCES); return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(userName); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -628,11 +628,28 @@ public READ3Response read(XDR xdr, SecurityHandler securityHandler, int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count); byte[] readbuffer = new byte[buffSize]; - DFSInputStream is = dfsClient.open(Nfs3Utils.getFileIdPath(handle)); - FSDataInputStream fis = new FSDataInputStream(is); - - int readCount = fis.read(offset, readbuffer, 0, count); - fis.close(); + int readCount = 0; + /** + * Retry exactly once because the DFSInputStream can be stale. + */ + for (int i = 0; i < 1; ++i) { + FSDataInputStream fis = clientCache.getDfsInputStream(userName, + Nfs3Utils.getFileIdPath(handle)); + + try { + readCount = fis.read(offset, readbuffer, 0, count); + } catch (IOException e) { + // TODO: A cleaner way is to throw a new type of exception + // which requires incompatible changes. + if (e.getMessage() == "Stream closed") { + clientCache.invalidateDfsInputStream(userName, + Nfs3Utils.getFileIdPath(handle)); + continue; + } else { + throw e; + } + } + } attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle), iug); @@ -660,7 +677,7 @@ public WRITE3Response write(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, InetAddress client) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -735,7 +752,7 @@ public WRITE3Response write(XDR xdr, Channel channel, int xid, public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, InetAddress client) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -858,7 +875,7 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, InetAddress client) { MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -954,7 +971,7 @@ public READDIR3Response mknod(XDR xdr, public REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, InetAddress client) { REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1029,7 +1046,7 @@ public REMOVE3Response remove(XDR xdr, public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, InetAddress client) { RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1111,7 +1128,7 @@ public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, InetAddress client) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1205,7 +1222,7 @@ public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1293,7 +1310,7 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1430,7 +1447,7 @@ public READDIRPLUS3Response readdirplus(XDR xdr, return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); } @@ -1587,7 +1604,7 @@ public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1645,7 +1662,7 @@ public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1697,7 +1714,7 @@ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1738,7 +1755,7 @@ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, public COMMIT3Response commit(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java index c550c1e02b1..360bb145fee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java @@ -39,12 +39,12 @@ public void testEviction() throws IOException { DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); - DFSClient c1 = cache.get("test1"); - assertTrue(cache.get("test1").toString().contains("ugi=test1")); - assertEquals(c1, cache.get("test1")); + DFSClient c1 = cache.getDfsClient("test1"); + assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); + assertEquals(c1, cache.getDfsClient("test1")); assertFalse(isDfsClientClose(c1)); - cache.get("test2"); + cache.getDfsClient("test2"); assertTrue(isDfsClientClose(c1)); assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 43c8dca5ad7..0e8d274f9e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -440,6 +440,9 @@ Release 2.2.1 - UNRELEASED HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers post HDFS-5306. (atm) + HDFS-5171. NFS should create input stream for a file and try to share it + with multiple read requests. (Haohui Mai via brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 44f6fec31b5..79a0feb3fa3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -108,6 +108,9 @@ Release 2.2.1 - UNRELEASED YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza) + YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp + into SchedulerApplication (Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index cc9b872724d..ade40c16c74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -17,44 +17,385 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; /** - * Represents an Application from the viewpoint of the scheduler. - * Each running Application in the RM corresponds to one instance + * Represents an application attempt from the viewpoint of the scheduler. + * Each running app attempt in the RM corresponds to one instance * of this class. */ @Private @Unstable public abstract class SchedulerApplication { + + private static final Log LOG = LogFactory.getLog(SchedulerApplication.class); + + protected final AppSchedulingInfo appSchedulingInfo; + + protected final Map liveContainers = + new HashMap(); + protected final Map> reservedContainers = + new HashMap>(); + + private final Multiset reReservations = HashMultiset.create(); + + protected final Resource currentReservation = Resource.newInstance(0, 0); + private Resource resourceLimit = Resource.newInstance(0, 0); + protected final Resource currentConsumption = Resource.newInstance(0, 0); + + protected List newlyAllocatedContainers = + new ArrayList(); /** - * Get {@link ApplicationAttemptId} of the application master. - * @return ApplicationAttemptId of the application master + * Count how many times the application has been given an opportunity + * to schedule a task at each priority. Each time the scheduler + * asks the application for a task at this priority, it is incremented, + * and each time the application successfully schedules a task, it + * is reset to 0. */ - public abstract ApplicationAttemptId getApplicationAttemptId(); + Multiset schedulingOpportunities = HashMultiset.create(); + + // Time of the last container scheduled at the current allowed level + protected Map lastScheduledContainer = + new HashMap(); + + protected final Queue queue; + protected boolean isStopped = false; + + protected final RMContext rmContext; + + public SchedulerApplication(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext) { + this.rmContext = rmContext; + this.appSchedulingInfo = + new AppSchedulingInfo(applicationAttemptId, user, queue, + activeUsersManager); + this.queue = queue; + } /** * Get the live containers of the application. * @return live containers of the application */ - public abstract Collection getLiveContainers(); - - /** - * Get the reserved containers of the application. - * @return the reserved containers of the application - */ - public abstract Collection getReservedContainers(); + public synchronized Collection getLiveContainers() { + return new ArrayList(liveContainers.values()); + } /** * Is this application pending? * @return true if it is else false. */ - public abstract boolean isPending(); + public boolean isPending() { + return appSchedulingInfo.isPending(); + } + + /** + * Get {@link ApplicationAttemptId} of the application master. + * @return ApplicationAttemptId of the application master + */ + public ApplicationAttemptId getApplicationAttemptId() { + return appSchedulingInfo.getApplicationAttemptId(); + } + + public ApplicationId getApplicationId() { + return appSchedulingInfo.getApplicationId(); + } + + public String getUser() { + return appSchedulingInfo.getUser(); + } + + public Map getResourceRequests(Priority priority) { + return appSchedulingInfo.getResourceRequests(priority); + } + + public int getNewContainerId() { + return appSchedulingInfo.getNewContainerId(); + } + + public Collection getPriorities() { + return appSchedulingInfo.getPriorities(); + } + + public ResourceRequest getResourceRequest(Priority priority, String resourceName) { + return this.appSchedulingInfo.getResourceRequest(priority, resourceName); + } + + public synchronized int getTotalRequiredResources(Priority priority) { + return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); + } + + public Resource getResource(Priority priority) { + return appSchedulingInfo.getResource(priority); + } + + public String getQueueName() { + return appSchedulingInfo.getQueueName(); + } + + public synchronized RMContainer getRMContainer(ContainerId id) { + return liveContainers.get(id); + } + + protected synchronized void resetReReservations(Priority priority) { + reReservations.setCount(priority, 0); + } + + protected synchronized void addReReservation(Priority priority) { + reReservations.add(priority); + } + + public synchronized int getReReservations(Priority priority) { + return reReservations.count(priority); + } + + /** + * Get total current reservations. + * Used only by unit tests + * @return total current reservations + */ + @Stable + @Private + public synchronized Resource getCurrentReservation() { + return currentReservation; + } + + public Queue getQueue() { + return queue; + } + + public synchronized void updateResourceRequests( + List requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests); + } + } + + public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { + // Cleanup all scheduling information + isStopped = true; + appSchedulingInfo.stop(rmAppAttemptFinalState); + } + + public synchronized boolean isStopped() { + return isStopped; + } + + /** + * Get the list of reserved containers + * @return All of the reserved containers. + */ + public synchronized List getReservedContainers() { + List reservedContainers = new ArrayList(); + for (Map.Entry> e : + this.reservedContainers.entrySet()) { + reservedContainers.addAll(e.getValue().values()); + } + return reservedContainers; + } + + public synchronized RMContainer reserve(SchedulerNode node, Priority priority, + RMContainer rmContainer, Container container) { + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = + new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), rmContext.getDispatcher().getEventHandler(), + rmContext.getContainerAllocationExpirer()); + + Resources.addTo(currentReservation, container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); + } + rmContainer.handle(new RMContainerReservedEvent(container.getId(), + container.getResource(), node.getNodeID(), priority)); + + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers == null) { + reservedContainers = new HashMap(); + this.reservedContainers.put(priority, reservedContainers); + } + reservedContainers.put(node.getNodeID(), rmContainer); + + LOG.info("Application " + getApplicationId() + + " reserved container " + rmContainer + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + + "; currentReservation " + currentReservation.getMemory()); + + return rmContainer; + } + + /** + * Has the application reserved the given node at the + * given priority? + * @param node node to be checked + * @param priority priority of reserved container + * @return true is reserved, false if not + */ + public synchronized boolean isReserved(SchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers != null) { + return reservedContainers.containsKey(node.getNodeID()); + } + return false; + } + + public synchronized void setHeadroom(Resource globalLimit) { + this.resourceLimit = globalLimit; + } + + /** + * Get available headroom in terms of resources for the application's user. + * @return available resource headroom + */ + public synchronized Resource getHeadroom() { + // Corner case to deal with applications being slightly over-limit + if (resourceLimit.getMemory() < 0) { + resourceLimit.setMemory(0); + } + + return resourceLimit; + } + + public synchronized int getNumReservedContainers(Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + return (reservedContainers == null) ? 0 : reservedContainers.size(); + } + + @SuppressWarnings("unchecked") + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { + // Inform the container + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + } + + public synchronized void showRequests() { + if (LOG.isDebugEnabled()) { + for (Priority priority : getPriorities()) { + Map requests = getResourceRequests(priority); + if (requests != null) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " headRoom=" + getHeadroom() + + " currentConsumption=" + currentConsumption.getMemory()); + for (ResourceRequest request : requests.values()) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " request=" + request); + } + } + } + } + } + + public Resource getCurrentConsumption() { + return currentConsumption; + } + + public synchronized List pullNewlyAllocatedContainers() { + List returnContainerList = new ArrayList( + newlyAllocatedContainers.size()); + for (RMContainer rmContainer : newlyAllocatedContainers) { + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + returnContainerList.add(rmContainer.getContainer()); + } + newlyAllocatedContainers.clear(); + return returnContainerList; + } + + public synchronized void updateBlacklist( + List blacklistAdditions, List blacklistRemovals) { + if (!isStopped) { + this.appSchedulingInfo.updateBlacklist( + blacklistAdditions, blacklistRemovals); + } + } + + public boolean isBlacklisted(String resourceName) { + return this.appSchedulingInfo.isBlacklisted(resourceName); + } + + public synchronized void addSchedulingOpportunity(Priority priority) { + schedulingOpportunities.setCount(priority, + schedulingOpportunities.count(priority) + 1); + } + + public synchronized void subtractSchedulingOpportunity(Priority priority) { + int count = schedulingOpportunities.count(priority) - 1; + this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); + } + + /** + * Return the number of times the application has been given an opportunity + * to schedule a task at the given priority since the last time it + * successfully did so. + */ + public synchronized int getSchedulingOpportunities(Priority priority) { + return schedulingOpportunities.count(priority); + } + + /** + * Should be called when an application has successfully scheduled a container, + * or when the scheduling locality threshold is relaxed. + * Reset various internal counters which affect delay scheduling + * + * @param priority The priority of the container scheduled. + */ + public synchronized void resetSchedulingOpportunities(Priority priority) { + resetSchedulingOpportunities(priority, System.currentTimeMillis()); + } + // used for continuous scheduling + public synchronized void resetSchedulingOpportunities(Priority priority, + long currentTimeMs) { + lastScheduledContainer.put(priority, currentTimeMs); + schedulingOpportunities.setCount(priority, 0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 2974b9dc05a..05872f9a31f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -72,4 +73,11 @@ public abstract class SchedulerNode { * @return total resources on the node. */ public abstract Resource getTotalResource(); + + /** + * Get the ID of the node which contains both its hostname and port. + * @return the ID of the node + */ + public abstract NodeId getNodeID(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index b93965cdc35..7f51126fec8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,22 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -41,194 +35,39 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; - /** - * Represents an Application from the viewpoint of the scheduler. - * Each running Application in the RM corresponds to one instance - * of this class. + * Represents an application attempt from the viewpoint of the FIFO or Capacity + * scheduler. */ -@SuppressWarnings("unchecked") @Private @Unstable public class FiCaSchedulerApp extends SchedulerApplication { private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); - private final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private final AppSchedulingInfo appSchedulingInfo; - private final Queue queue; - - private final Resource currentConsumption = recordFactory - .newRecordInstance(Resource.class); - private Resource resourceLimit = recordFactory - .newRecordInstance(Resource.class); - - private Map liveContainers = - new HashMap(); - private List newlyAllocatedContainers = - new ArrayList(); - - final Map> reservedContainers = - new HashMap>(); - - private boolean isStopped = false; - private final Set containersToPreempt = new HashSet(); - /** - * Count how many times the application has been given an opportunity - * to schedule a task at each priority. Each time the scheduler - * asks the application for a task at this priority, it is incremented, - * and each time the application successfully schedules a task, it - * is reset to 0. - */ - Multiset schedulingOpportunities = HashMultiset.create(); - - Multiset reReservations = HashMultiset.create(); - - Resource currentReservation = recordFactory - .newRecordInstance(Resource.class); - - private final RMContext rmContext; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { - this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); - this.queue = queue; - } - - public ApplicationId getApplicationId() { - return this.appSchedulingInfo.getApplicationId(); - } - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return this.appSchedulingInfo.getApplicationAttemptId(); - } - - public String getUser() { - return this.appSchedulingInfo.getUser(); - } - - public synchronized void updateResourceRequests( - List requests) { - if (!isStopped) { - this.appSchedulingInfo.updateResourceRequests(requests); - } - } - - public synchronized void updateBlacklist( - List blacklistAdditions, List blacklistRemovals) { - if (!isStopped) { - this.appSchedulingInfo.updateBlacklist( - blacklistAdditions, blacklistRemovals); - } - } - - public Map getResourceRequests(Priority priority) { - return this.appSchedulingInfo.getResourceRequests(priority); - } - - public int getNewContainerId() { - return this.appSchedulingInfo.getNewContainerId(); - } - - public Collection getPriorities() { - return this.appSchedulingInfo.getPriorities(); - } - - public ResourceRequest getResourceRequest(Priority priority, String resourceName) { - return this.appSchedulingInfo.getResourceRequest(priority, resourceName); - } - - public synchronized int getTotalRequiredResources(Priority priority) { - return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); - } - - public Resource getResource(Priority priority) { - return this.appSchedulingInfo.getResource(priority); - } - - public boolean isBlacklisted(String resourceName) { - return this.appSchedulingInfo.isBlacklisted(resourceName); - } - - /** - * Is this application pending? - * @return true if it is else false. - */ - @Override - public boolean isPending() { - return this.appSchedulingInfo.isPending(); - } - - public synchronized boolean isStopped() { - return this.isStopped; - } - - public String getQueueName() { - return this.appSchedulingInfo.getQueueName(); - } - - /** - * Get the list of live containers - * @return All of the live containers - */ - @Override - public synchronized Collection getLiveContainers() { - return new ArrayList(liveContainers.values()); - } - - public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { - // Cleanup all scheduling information - this.isStopped = true; - this.appSchedulingInfo.stop(rmAppAttemptFinalState); - } - - public synchronized void containerLaunchedOnNode(ContainerId containerId, - NodeId nodeId) { - // Inform the container - RMContainer rmContainer = - getRMContainer(containerId); - if (rmContainer == null) { - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); - return; - } - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); } synchronized public boolean containerCompleted(RMContainer rmContainer, @@ -310,133 +149,6 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, return rmContainer; } - - synchronized public List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); - returnContainerList.add(rmContainer.getContainer()); - } - newlyAllocatedContainers.clear(); - return returnContainerList; - } - - public Resource getCurrentConsumption() { - return this.currentConsumption; - } - - synchronized public void showRequests() { - if (LOG.isDebugEnabled()) { - for (Priority priority : getPriorities()) { - Map requests = getResourceRequests(priority); - if (requests != null) { - LOG.debug("showRequests:" + " application=" + getApplicationId() + - " headRoom=" + getHeadroom() + - " currentConsumption=" + currentConsumption.getMemory()); - for (ResourceRequest request : requests.values()) { - LOG.debug("showRequests:" + " application=" + getApplicationId() - + " request=" + request); - } - } - } - } - } - - public synchronized RMContainer getRMContainer(ContainerId id) { - return liveContainers.get(id); - } - - synchronized public void resetSchedulingOpportunities(Priority priority) { - this.schedulingOpportunities.setCount(priority, 0); - } - - synchronized public void addSchedulingOpportunity(Priority priority) { - this.schedulingOpportunities.setCount(priority, - schedulingOpportunities.count(priority) + 1); - } - - synchronized public void subtractSchedulingOpportunity(Priority priority) { - int count = schedulingOpportunities.count(priority) - 1; - this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); - } - - /** - * @param priority Target priority - * @return the number of times the application has been given an opportunity - * to schedule a task at the given priority since the last time it - * successfully did so. - */ - synchronized public int getSchedulingOpportunities(Priority priority) { - return this.schedulingOpportunities.count(priority); - } - - synchronized void resetReReservations(Priority priority) { - this.reReservations.setCount(priority, 0); - } - - synchronized void addReReservation(Priority priority) { - this.reReservations.add(priority); - } - - synchronized public int getReReservations(Priority priority) { - return this.reReservations.count(priority); - } - - public synchronized int getNumReservedContainers(Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - return (reservedContainers == null) ? 0 : reservedContainers.size(); - } - - /** - * Get total current reservations. - * Used only by unit tests - * @return total current reservations - */ - @Stable - @Private - public synchronized Resource getCurrentReservation() { - return currentReservation; - } - - public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority, - RMContainer rmContainer, Container container) { - // Create RMContainer if necessary - if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), rmContext.getDispatcher().getEventHandler(), - rmContext.getContainerAllocationExpirer()); - - Resources.addTo(currentReservation, container.getResource()); - - // Reset the re-reservation count - resetReReservations(priority); - } else { - // Note down the re-reservation - addReReservation(priority); - } - rmContainer.handle(new RMContainerReservedEvent(container.getId(), - container.getResource(), node.getNodeID(), priority)); - - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers == null) { - reservedContainers = new HashMap(); - this.reservedContainers.put(priority, reservedContainers); - } - reservedContainers.put(node.getNodeID(), rmContainer); - - LOG.info("Application " + getApplicationId() - + " reserved container " + rmContainer - + " on node " + node + ", currently has " + reservedContainers.size() - + " at priority " + priority - + "; currentReservation " + currentReservation.getMemory()); - - return rmContainer; - } public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { Map reservedContainers = @@ -470,22 +182,6 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) return false; } - /** - * Has the application reserved the given node at the - * given priority? - * @param node node to be checked - * @param priority priority of reserved container - * @return true is reserved, false if not - */ - public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers != null) { - return reservedContainers.containsKey(node.getNodeID()); - } - return false; - } - public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) @@ -497,41 +193,6 @@ public synchronized float getLocalityWaitFactor( return Math.min(((float)requiredResources / clusterNodes), 1.0f); } - /** - * Get the list of reserved containers - * @return All of the reserved containers. - */ - @Override - public synchronized List getReservedContainers() { - List reservedContainers = new ArrayList(); - for (Map.Entry> e : - this.reservedContainers.entrySet()) { - reservedContainers.addAll(e.getValue().values()); - } - return reservedContainers; - } - - public synchronized void setHeadroom(Resource globalLimit) { - this.resourceLimit = globalLimit; - } - - /** - * Get available headroom in terms of resources for the application's user. - * @return available resource headroom - */ - public synchronized Resource getHeadroom() { - // Corner case to deal with applications being slightly over-limit - if (resourceLimit.getMemory() < 0) { - resourceLimit.setMemory(0); - } - - return resourceLimit; - } - - public Queue getQueue() { - return queue; - } - public Resource getTotalPendingRequests() { Resource ret = Resource.newInstance(0, 0); for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 8b5d454305d..caf2a97d712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -18,10 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; @@ -30,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,92 +34,39 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; - +/** + * Represents an application attempt from the viewpoint of the Fair Scheduler. + */ @Private @Unstable public class FSSchedulerApp extends SchedulerApplication { private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); - private final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private final AppSchedulingInfo appSchedulingInfo; private AppSchedulable appSchedulable; - private final Queue queue; - - private final Resource currentConsumption = recordFactory - .newRecordInstance(Resource.class); - private Resource resourceLimit = recordFactory - .newRecordInstance(Resource.class); - - private Map liveContainers - = new HashMap(); - private List newlyAllocatedContainers = - new ArrayList(); - - final Map> reservedContainers = - new HashMap>(); final Map preemptionMap = new HashMap(); - - /** - * Count how many times the application has been given an opportunity - * to schedule a task at each priority. Each time the scheduler - * asks the application for a task at this priority, it is incremented, - * and each time the application successfully schedules a task, it - * is reset to 0. - */ - Multiset schedulingOpportunities = HashMultiset.create(); - Multiset reReservations = HashMultiset.create(); - - Resource currentReservation = recordFactory - .newRecordInstance(Resource.class); - - private final RMContext rmContext; public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { - this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); - this.queue = queue; + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); } - public ApplicationId getApplicationId() { - return appSchedulingInfo.getApplicationId(); - } - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return appSchedulingInfo.getApplicationAttemptId(); - } - public void setAppSchedulable(AppSchedulable appSchedulable) { this.appSchedulable = appSchedulable; } @@ -132,83 +75,6 @@ public AppSchedulable getAppSchedulable() { return appSchedulable; } - public String getUser() { - return appSchedulingInfo.getUser(); - } - - public synchronized void updateResourceRequests( - List requests) { - this.appSchedulingInfo.updateResourceRequests(requests); - } - - public Map getResourceRequests(Priority priority) { - return appSchedulingInfo.getResourceRequests(priority); - } - - public int getNewContainerId() { - return appSchedulingInfo.getNewContainerId(); - } - - public Collection getPriorities() { - return appSchedulingInfo.getPriorities(); - } - - public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { - return appSchedulingInfo.getResourceRequest(priority, nodeAddress); - } - - public synchronized int getTotalRequiredResources(Priority priority) { - return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); - } - - public Resource getResource(Priority priority) { - return appSchedulingInfo.getResource(priority); - } - - /** - * Is this application pending? - * @return true if it is else false. - */ - @Override - public boolean isPending() { - return appSchedulingInfo.isPending(); - } - - public String getQueueName() { - return appSchedulingInfo.getQueueName(); - } - - /** - * Get the list of live containers - * @return All of the live containers - */ - @Override - public synchronized Collection getLiveContainers() { - return new ArrayList(liveContainers.values()); - } - - public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { - // Cleanup all scheduling information - appSchedulingInfo.stop(rmAppAttemptFinalState); - } - - @SuppressWarnings("unchecked") - public synchronized void containerLaunchedOnNode(ContainerId containerId, - NodeId nodeId) { - // Inform the container - RMContainer rmContainer = - getRMContainer(containerId); - if (rmContainer == null) { - // Some unknown container sneaked into the system. Kill it. - rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); - return; - } - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); - } - synchronized public void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { @@ -241,122 +107,6 @@ synchronized public void containerCompleted(RMContainer rmContainer, preemptionMap.remove(rmContainer); } - synchronized public List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); - returnContainerList.add(rmContainer.getContainer()); - } - newlyAllocatedContainers.clear(); - return returnContainerList; - } - - public Resource getCurrentConsumption() { - return this.currentConsumption; - } - - synchronized public void showRequests() { - if (LOG.isDebugEnabled()) { - for (Priority priority : getPriorities()) { - Map requests = getResourceRequests(priority); - if (requests != null) { - LOG.debug("showRequests:" + " application=" + getApplicationId() + - " headRoom=" + getHeadroom() + - " currentConsumption=" + currentConsumption.getMemory()); - for (ResourceRequest request : requests.values()) { - LOG.debug("showRequests:" + " application=" + getApplicationId() - + " request=" + request); - } - } - } - } - } - - public synchronized RMContainer getRMContainer(ContainerId id) { - return liveContainers.get(id); - } - - synchronized public void addSchedulingOpportunity(Priority priority) { - schedulingOpportunities.setCount(priority, - schedulingOpportunities.count(priority) + 1); - } - - /** - * Return the number of times the application has been given an opportunity - * to schedule a task at the given priority since the last time it - * successfully did so. - */ - synchronized public int getSchedulingOpportunities(Priority priority) { - return schedulingOpportunities.count(priority); - } - - synchronized void resetReReservations(Priority priority) { - reReservations.setCount(priority, 0); - } - - synchronized void addReReservation(Priority priority) { - reReservations.add(priority); - } - - synchronized public int getReReservations(Priority priority) { - return reReservations.count(priority); - } - - public synchronized int getNumReservedContainers(Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - return (reservedContainers == null) ? 0 : reservedContainers.size(); - } - - /** - * Get total current reservations. - * Used only by unit tests - * @return total current reservations - */ - @VisibleForTesting - public synchronized Resource getCurrentReservation() { - return currentReservation; - } - - public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority, - RMContainer rmContainer, Container container) { - // Create RMContainer if necessary - if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), rmContext.getDispatcher().getEventHandler(), - rmContext.getContainerAllocationExpirer()); - - Resources.addTo(currentReservation, container.getResource()); - - // Reset the re-reservation count - resetReReservations(priority); - } else { - // Note down the re-reservation - addReReservation(priority); - } - rmContainer.handle(new RMContainerReservedEvent(container.getId(), - container.getResource(), node.getNodeID(), priority)); - - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers == null) { - reservedContainers = new HashMap(); - this.reservedContainers.put(priority, reservedContainers); - } - reservedContainers.put(node.getNodeID(), rmContainer); - - LOG.info("Application " + getApplicationId() - + " reserved container " + rmContainer - + " on node " + node + ", currently has " + reservedContainers.size() - + " at priority " + priority - + "; currentReservation " + currentReservation.getMemory()); - - return rmContainer; - } - public synchronized void unreserve(FSSchedulerNode node, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); @@ -376,22 +126,6 @@ public synchronized void unreserve(FSSchedulerNode node, Priority priority) { + priority + "; currentReservation " + currentReservation); } - /** - * Has the application reserved the given node at the - * given priority? - * @param node node to be checked - * @param priority priority of reserved container - * @return true is reserved, false if not - */ - public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - if (reservedContainers != null) { - return reservedContainers.containsKey(node.getNodeID()); - } - return false; - } - public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) @@ -402,42 +136,7 @@ public synchronized float getLocalityWaitFactor( // i.e. no point skipping more than clustersize opportunities return Math.min(((float)requiredResources / clusterNodes), 1.0f); } - - /** - * Get the list of reserved containers - * @return All of the reserved containers. - */ - @Override - public synchronized List getReservedContainers() { - List reservedContainers = new ArrayList(); - for (Map.Entry> e : - this.reservedContainers.entrySet()) { - reservedContainers.addAll(e.getValue().values()); - } - return reservedContainers; - } - public synchronized void setHeadroom(Resource globalLimit) { - this.resourceLimit = globalLimit; - } - - /** - * Get available headroom in terms of resources for the application's user. - * @return available resource headroom - */ - public synchronized Resource getHeadroom() { - // Corner case to deal with applications being slightly over-limit - if (resourceLimit.getMemory() < 0) { - resourceLimit.setMemory(0); - } - - return resourceLimit; - } - - public Queue getQueue() { - return queue; - } - /** * Delay scheduling: We often want to prioritize scheduling of node-local * containers over rack-local or off-switch containers. To acheive this @@ -453,26 +152,6 @@ public Queue getQueue() { final Map allowedLocalityLevel = new HashMap< Priority, NodeType>(); - // Time of the last container scheduled at the current allowed level - Map lastScheduledContainer = new HashMap(); - - /** - * Should be called when an application has successfully scheduled a container, - * or when the scheduling locality threshold is relaxed. - * Reset various internal counters which affect delay scheduling - * - * @param priority The priority of the container scheduled. - */ - synchronized public void resetSchedulingOpportunities(Priority priority) { - resetSchedulingOpportunities(priority, System.currentTimeMillis()); - } - // used for continuous scheduling - synchronized public void resetSchedulingOpportunities(Priority priority, - long currentTimeMs) { - lastScheduledContainer.put(priority, currentTimeMs); - schedulingOpportunities.setCount(priority, 0); - } - /** * Return the level at which we are allowed to schedule containers, given the * current size of the cluster and thresholds indicating how many nodes to