Merge r1535533 through r1535791 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1535792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-10-25 17:23:11 +00:00
commit 9043a92219
9 changed files with 513 additions and 711 deletions

View File

@ -20,15 +20,19 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -41,15 +45,52 @@ import com.google.common.cache.RemovalNotification;
class DFSClientCache { class DFSClientCache {
private static final Log LOG = LogFactory.getLog(DFSClientCache.class); private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
/** /**
* Cache that maps User id to corresponding DFSClient. * Cache that maps User id to the corresponding DFSClient.
*/ */
@VisibleForTesting @VisibleForTesting
final LoadingCache<String, DFSClient> clientCache; final LoadingCache<String, DFSClient> clientCache;
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
/**
* Cache that maps <DFSClient, inode path> to the corresponding
* FSDataInputStream.
*/
final LoadingCache<DFSInputStreamCaheKey, FSDataInputStream> inputstreamCache;
/**
* Time to live for a DFSClient (in seconds)
*/
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024;
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60;
private final Configuration config; private final Configuration config;
private static class DFSInputStreamCaheKey {
final String userId;
final String inodePath;
private DFSInputStreamCaheKey(String userId, String inodePath) {
super();
this.userId = userId;
this.inodePath = inodePath;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DFSInputStreamCaheKey) {
DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj;
return userId.equals(k.userId) && inodePath.equals(k.inodePath);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(userId, inodePath);
}
}
DFSClientCache(Configuration config) { DFSClientCache(Configuration config) {
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
} }
@ -60,6 +101,12 @@ class DFSClientCache {
.maximumSize(clientCache) .maximumSize(clientCache)
.removalListener(clientRemovealListener()) .removalListener(clientRemovealListener())
.build(clientLoader()); .build(clientLoader());
this.inputstreamCache = CacheBuilder.newBuilder()
.maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE)
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
.removalListener(inputStreamRemovalListener())
.build(inputStreamLoader());
} }
private CacheLoader<String, DFSClient> clientLoader() { private CacheLoader<String, DFSClient> clientLoader() {
@ -95,7 +142,33 @@ class DFSClientCache {
}; };
} }
DFSClient get(String userName) { private RemovalListener<DFSInputStreamCaheKey, FSDataInputStream> inputStreamRemovalListener() {
return new RemovalListener<DFSClientCache.DFSInputStreamCaheKey, FSDataInputStream>() {
@Override
public void onRemoval(
RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) {
try {
notification.getValue().close();
} catch (IOException e) {
}
}
};
}
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() {
return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() {
@Override
public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
DFSClient client = getDfsClient(key.userId);
DFSInputStream dis = client.open(key.inodePath);
return new FSDataInputStream(dis);
}
};
}
DFSClient getDfsClient(String userName) {
DFSClient client = null; DFSClient client = null;
try { try {
client = clientCache.get(userName); client = clientCache.get(userName);
@ -105,4 +178,21 @@ class DFSClientCache {
} }
return client; return client;
} }
FSDataInputStream getDfsInputStream(String userName, String inodePath) {
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
FSDataInputStream s = null;
try {
s = inputstreamCache.get(k);
} catch (ExecutionException e) {
LOG.warn("Failed to create DFSInputStream for user:" + userName
+ " Cause:" + e);
}
return s;
}
public void invalidateDfsInputStream(String userName, String inodePath) {
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
inputstreamCache.invalidate(k);
}
} }

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -235,7 +234,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -310,7 +309,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -392,7 +391,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -454,7 +453,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -502,7 +501,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -563,13 +562,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public READ3Response read(XDR xdr, SecurityHandler securityHandler, public READ3Response read(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK); READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
final String userName = securityHandler.getUser();
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES); response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(userName);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -628,11 +628,28 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count); int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count);
byte[] readbuffer = new byte[buffSize]; byte[] readbuffer = new byte[buffSize];
DFSInputStream is = dfsClient.open(Nfs3Utils.getFileIdPath(handle)); int readCount = 0;
FSDataInputStream fis = new FSDataInputStream(is); /**
* Retry exactly once because the DFSInputStream can be stale.
*/
for (int i = 0; i < 1; ++i) {
FSDataInputStream fis = clientCache.getDfsInputStream(userName,
Nfs3Utils.getFileIdPath(handle));
int readCount = fis.read(offset, readbuffer, 0, count); try {
fis.close(); readCount = fis.read(offset, readbuffer, 0, count);
} catch (IOException e) {
// TODO: A cleaner way is to throw a new type of exception
// which requires incompatible changes.
if (e.getMessage() == "Stream closed") {
clientCache.invalidateDfsInputStream(userName,
Nfs3Utils.getFileIdPath(handle));
continue;
} else {
throw e;
}
}
}
attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle), attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle),
iug); iug);
@ -660,7 +677,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
SecurityHandler securityHandler, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -735,7 +752,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -858,7 +875,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -954,7 +971,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public REMOVE3Response remove(XDR xdr, public REMOVE3Response remove(XDR xdr,
SecurityHandler securityHandler, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1029,7 +1046,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1111,7 +1128,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1205,7 +1222,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1293,7 +1310,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1430,7 +1447,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
} }
@ -1587,7 +1604,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1645,7 +1662,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1697,7 +1714,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1738,7 +1755,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public COMMIT3Response commit(XDR xdr, Channel channel, int xid, public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
SecurityHandler securityHandler, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;

View File

@ -39,12 +39,12 @@ public class TestDFSClientCache {
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
DFSClient c1 = cache.get("test1"); DFSClient c1 = cache.getDfsClient("test1");
assertTrue(cache.get("test1").toString().contains("ugi=test1")); assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1"));
assertEquals(c1, cache.get("test1")); assertEquals(c1, cache.getDfsClient("test1"));
assertFalse(isDfsClientClose(c1)); assertFalse(isDfsClientClose(c1));
cache.get("test2"); cache.getDfsClient("test2");
assertTrue(isDfsClientClose(c1)); assertTrue(isDfsClientClose(c1));
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
} }

View File

@ -440,6 +440,9 @@ Release 2.2.1 - UNRELEASED
HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers
post HDFS-5306. (atm) post HDFS-5306. (atm)
HDFS-5171. NFS should create input stream for a file and try to share it
with multiple read requests. (Haohui Mai via brandonli)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -108,6 +108,9 @@ Release 2.2.1 - UNRELEASED
YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza) YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza)
YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp
into SchedulerApplication (Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -17,44 +17,385 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
/** /**
* Represents an Application from the viewpoint of the scheduler. * Represents an application attempt from the viewpoint of the scheduler.
* Each running Application in the RM corresponds to one instance * Each running app attempt in the RM corresponds to one instance
* of this class. * of this class.
*/ */
@Private @Private
@Unstable @Unstable
public abstract class SchedulerApplication { public abstract class SchedulerApplication {
private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
protected final AppSchedulingInfo appSchedulingInfo;
protected final Map<ContainerId, RMContainer> liveContainers =
new HashMap<ContainerId, RMContainer>();
protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
private final Multiset<Priority> reReservations = HashMultiset.create();
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected final Resource currentConsumption = Resource.newInstance(0, 0);
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
/** /**
* Get {@link ApplicationAttemptId} of the application master. * Count how many times the application has been given an opportunity
* @return <code>ApplicationAttemptId</code> of the application master * to schedule a task at each priority. Each time the scheduler
* asks the application for a task at this priority, it is incremented,
* and each time the application successfully schedules a task, it
* is reset to 0.
*/ */
public abstract ApplicationAttemptId getApplicationAttemptId(); Multiset<Priority> schedulingOpportunities = HashMultiset.create();
// Time of the last container scheduled at the current allowed level
protected Map<Priority, Long> lastScheduledContainer =
new HashMap<Priority, Long>();
protected final Queue queue;
protected boolean isStopped = false;
protected final RMContext rmContext;
public SchedulerApplication(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
}
/** /**
* Get the live containers of the application. * Get the live containers of the application.
* @return live containers of the application * @return live containers of the application
*/ */
public abstract Collection<RMContainer> getLiveContainers(); public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
/** }
* Get the reserved containers of the application.
* @return the reserved containers of the application
*/
public abstract Collection<RMContainer> getReservedContainers();
/** /**
* Is this application pending? * Is this application pending?
* @return true if it is else false. * @return true if it is else false.
*/ */
public abstract boolean isPending(); public boolean isPending() {
return appSchedulingInfo.isPending();
}
/**
* Get {@link ApplicationAttemptId} of the application master.
* @return <code>ApplicationAttemptId</code> of the application master
*/
public ApplicationAttemptId getApplicationAttemptId() {
return appSchedulingInfo.getApplicationAttemptId();
}
public ApplicationId getApplicationId() {
return appSchedulingInfo.getApplicationId();
}
public String getUser() {
return appSchedulingInfo.getUser();
}
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
return appSchedulingInfo.getResourceRequests(priority);
}
public int getNewContainerId() {
return appSchedulingInfo.getNewContainerId();
}
public Collection<Priority> getPriorities() {
return appSchedulingInfo.getPriorities();
}
public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
}
public synchronized int getTotalRequiredResources(Priority priority) {
return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
}
public Resource getResource(Priority priority) {
return appSchedulingInfo.getResource(priority);
}
public String getQueueName() {
return appSchedulingInfo.getQueueName();
}
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
protected synchronized void resetReReservations(Priority priority) {
reReservations.setCount(priority, 0);
}
protected synchronized void addReReservation(Priority priority) {
reReservations.add(priority);
}
public synchronized int getReReservations(Priority priority) {
return reReservations.count(priority);
}
/**
* Get total current reservations.
* Used only by unit tests
* @return total current reservations
*/
@Stable
@Private
public synchronized Resource getCurrentReservation() {
return currentReservation;
}
public Queue getQueue() {
return queue;
}
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(requests);
}
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information
isStopped = true;
appSchedulingInfo.stop(rmAppAttemptFinalState);
}
public synchronized boolean isStopped() {
return isStopped;
}
/**
* Get the list of reserved containers
* @return All of the reserved containers.
*/
public synchronized List<RMContainer> getReservedContainers() {
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
reservedContainers.addAll(e.getValue().values());
}
return reservedContainers;
}
public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
RMContainer rmContainer, Container container) {
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
rmContext.getContainerAllocationExpirer());
Resources.addTo(currentReservation, container.getResource());
// Reset the re-reservation count
resetReReservations(priority);
} else {
// Note down the re-reservation
addReReservation(priority);
}
rmContainer.handle(new RMContainerReservedEvent(container.getId(),
container.getResource(), node.getNodeID(), priority));
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers == null) {
reservedContainers = new HashMap<NodeId, RMContainer>();
this.reservedContainers.put(priority, reservedContainers);
}
reservedContainers.put(node.getNodeID(), rmContainer);
LOG.info("Application " + getApplicationId()
+ " reserved container " + rmContainer
+ " on node " + node + ", currently has " + reservedContainers.size()
+ " at priority " + priority
+ "; currentReservation " + currentReservation.getMemory());
return rmContainer;
}
/**
* Has the application reserved the given <code>node</code> at the
* given <code>priority</code>?
* @param node node to be checked
* @param priority priority of reserved container
* @return true is reserved, false if not
*/
public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers != null) {
return reservedContainers.containsKey(node.getNodeID());
}
return false;
}
public synchronized void setHeadroom(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
/**
* Get available headroom in terms of resources for the application's user.
* @return available resource headroom
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemory() < 0) {
resourceLimit.setMemory(0);
}
return resourceLimit;
}
public synchronized int getNumReservedContainers(Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
return (reservedContainers == null) ? 0 : reservedContainers.size();
}
@SuppressWarnings("unchecked")
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// Inform the container
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
}
public synchronized void showRequests() {
if (LOG.isDebugEnabled()) {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId() +
" headRoom=" + getHeadroom() +
" currentConsumption=" + currentConsumption.getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
}
}
}
}
}
public Resource getCurrentConsumption() {
return currentConsumption;
}
public synchronized List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
for (RMContainer rmContainer : newlyAllocatedContainers) {
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
returnContainerList.add(rmContainer.getContainer());
}
newlyAllocatedContainers.clear();
return returnContainerList;
}
public synchronized void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
if (!isStopped) {
this.appSchedulingInfo.updateBlacklist(
blacklistAdditions, blacklistRemovals);
}
}
public boolean isBlacklisted(String resourceName) {
return this.appSchedulingInfo.isBlacklisted(resourceName);
}
public synchronized void addSchedulingOpportunity(Priority priority) {
schedulingOpportunities.setCount(priority,
schedulingOpportunities.count(priority) + 1);
}
public synchronized void subtractSchedulingOpportunity(Priority priority) {
int count = schedulingOpportunities.count(priority) - 1;
this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
}
/**
* Return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it
* successfully did so.
*/
public synchronized int getSchedulingOpportunities(Priority priority) {
return schedulingOpportunities.count(priority);
}
/**
* Should be called when an application has successfully scheduled a container,
* or when the scheduling locality threshold is relaxed.
* Reset various internal counters which affect delay scheduling
*
* @param priority The priority of the container scheduled.
*/
public synchronized void resetSchedulingOpportunities(Priority priority) {
resetSchedulingOpportunities(priority, System.currentTimeMillis());
}
// used for continuous scheduling
public synchronized void resetSchedulingOpportunities(Priority priority,
long currentTimeMs) {
lastScheduledContainer.put(priority, currentTimeMs);
schedulingOpportunities.setCount(priority, 0);
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -72,4 +73,11 @@ public abstract class SchedulerNode {
* @return total resources on the node. * @return total resources on the node.
*/ */
public abstract Resource getTotalResource(); public abstract Resource getTotalResource();
/**
* Get the ID of the node which contains both its hostname and port.
* @return the ID of the node
*/
public abstract NodeId getNodeID();
} }

View File

@ -18,22 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -41,194 +35,39 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
/** /**
* Represents an Application from the viewpoint of the scheduler. * Represents an application attempt from the viewpoint of the FIFO or Capacity
* Each running Application in the RM corresponds to one instance * scheduler.
* of this class.
*/ */
@SuppressWarnings("unchecked")
@Private @Private
@Unstable @Unstable
public class FiCaSchedulerApp extends SchedulerApplication { public class FiCaSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private final AppSchedulingInfo appSchedulingInfo;
private final Queue queue;
private final Resource currentConsumption = recordFactory
.newRecordInstance(Resource.class);
private Resource resourceLimit = recordFactory
.newRecordInstance(Resource.class);
private Map<ContainerId, RMContainer> liveContainers =
new HashMap<ContainerId, RMContainer>();
private List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
private boolean isStopped = false;
private final Set<ContainerId> containersToPreempt = private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>(); new HashSet<ContainerId>();
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
* asks the application for a task at this priority, it is incremented,
* and each time the application successfully schedules a task, it
* is reset to 0.
*/
Multiset<Priority> schedulingOpportunities = HashMultiset.create();
Multiset<Priority> reReservations = HashMultiset.create();
Resource currentReservation = recordFactory
.newRecordInstance(Resource.class);
private final RMContext rmContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) { RMContext rmContext) {
this.rmContext = rmContext; super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
}
public ApplicationId getApplicationId() {
return this.appSchedulingInfo.getApplicationId();
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return this.appSchedulingInfo.getApplicationAttemptId();
}
public String getUser() {
return this.appSchedulingInfo.getUser();
}
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
}
public synchronized void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
if (!isStopped) {
this.appSchedulingInfo.updateBlacklist(
blacklistAdditions, blacklistRemovals);
}
}
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
return this.appSchedulingInfo.getResourceRequests(priority);
}
public int getNewContainerId() {
return this.appSchedulingInfo.getNewContainerId();
}
public Collection<Priority> getPriorities() {
return this.appSchedulingInfo.getPriorities();
}
public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
}
public synchronized int getTotalRequiredResources(Priority priority) {
return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
}
public Resource getResource(Priority priority) {
return this.appSchedulingInfo.getResource(priority);
}
public boolean isBlacklisted(String resourceName) {
return this.appSchedulingInfo.isBlacklisted(resourceName);
}
/**
* Is this application pending?
* @return true if it is else false.
*/
@Override
public boolean isPending() {
return this.appSchedulingInfo.isPending();
}
public synchronized boolean isStopped() {
return this.isStopped;
}
public String getQueueName() {
return this.appSchedulingInfo.getQueueName();
}
/**
* Get the list of live containers
* @return All of the live containers
*/
@Override
public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information
this.isStopped = true;
this.appSchedulingInfo.stop(rmAppAttemptFinalState);
}
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// Inform the container
RMContainer rmContainer =
getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
} }
synchronized public boolean containerCompleted(RMContainer rmContainer, synchronized public boolean containerCompleted(RMContainer rmContainer,
@ -311,133 +150,6 @@ public class FiCaSchedulerApp extends SchedulerApplication {
return rmContainer; return rmContainer;
} }
synchronized public List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
for (RMContainer rmContainer : newlyAllocatedContainers) {
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
returnContainerList.add(rmContainer.getContainer());
}
newlyAllocatedContainers.clear();
return returnContainerList;
}
public Resource getCurrentConsumption() {
return this.currentConsumption;
}
synchronized public void showRequests() {
if (LOG.isDebugEnabled()) {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId() +
" headRoom=" + getHeadroom() +
" currentConsumption=" + currentConsumption.getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
}
}
}
}
}
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
synchronized public void resetSchedulingOpportunities(Priority priority) {
this.schedulingOpportunities.setCount(priority, 0);
}
synchronized public void addSchedulingOpportunity(Priority priority) {
this.schedulingOpportunities.setCount(priority,
schedulingOpportunities.count(priority) + 1);
}
synchronized public void subtractSchedulingOpportunity(Priority priority) {
int count = schedulingOpportunities.count(priority) - 1;
this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
}
/**
* @param priority Target priority
* @return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it
* successfully did so.
*/
synchronized public int getSchedulingOpportunities(Priority priority) {
return this.schedulingOpportunities.count(priority);
}
synchronized void resetReReservations(Priority priority) {
this.reReservations.setCount(priority, 0);
}
synchronized void addReReservation(Priority priority) {
this.reReservations.add(priority);
}
synchronized public int getReReservations(Priority priority) {
return this.reReservations.count(priority);
}
public synchronized int getNumReservedContainers(Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
return (reservedContainers == null) ? 0 : reservedContainers.size();
}
/**
* Get total current reservations.
* Used only by unit tests
* @return total current reservations
*/
@Stable
@Private
public synchronized Resource getCurrentReservation() {
return currentReservation;
}
public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority,
RMContainer rmContainer, Container container) {
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
rmContext.getContainerAllocationExpirer());
Resources.addTo(currentReservation, container.getResource());
// Reset the re-reservation count
resetReReservations(priority);
} else {
// Note down the re-reservation
addReReservation(priority);
}
rmContainer.handle(new RMContainerReservedEvent(container.getId(),
container.getResource(), node.getNodeID(), priority));
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers == null) {
reservedContainers = new HashMap<NodeId, RMContainer>();
this.reservedContainers.put(priority, reservedContainers);
}
reservedContainers.put(node.getNodeID(), rmContainer);
LOG.info("Application " + getApplicationId()
+ " reserved container " + rmContainer
+ " on node " + node + ", currently has " + reservedContainers.size()
+ " at priority " + priority
+ "; currentReservation " + currentReservation.getMemory());
return rmContainer;
}
public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers = Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority); this.reservedContainers.get(priority);
@ -470,22 +182,6 @@ public class FiCaSchedulerApp extends SchedulerApplication {
return false; return false;
} }
/**
* Has the application reserved the given <code>node</code> at the
* given <code>priority</code>?
* @param node node to be checked
* @param priority priority of reserved container
* @return true is reserved, false if not
*/
public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers != null) {
return reservedContainers.containsKey(node.getNodeID());
}
return false;
}
public synchronized float getLocalityWaitFactor( public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) { Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks) // Estimate: Required unique resources (i.e. hosts + racks)
@ -497,41 +193,6 @@ public class FiCaSchedulerApp extends SchedulerApplication {
return Math.min(((float)requiredResources / clusterNodes), 1.0f); return Math.min(((float)requiredResources / clusterNodes), 1.0f);
} }
/**
* Get the list of reserved containers
* @return All of the reserved containers.
*/
@Override
public synchronized List<RMContainer> getReservedContainers() {
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
reservedContainers.addAll(e.getValue().values());
}
return reservedContainers;
}
public synchronized void setHeadroom(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
/**
* Get available headroom in terms of resources for the application's user.
* @return available resource headroom
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemory() < 0) {
resourceLimit.setMemory(0);
}
return resourceLimit;
}
public Queue getQueue() {
return queue;
}
public Resource getTotalPendingRequests() { public Resource getTotalPendingRequests() {
Resource ret = Resource.newInstance(0, 0); Resource ret = Resource.newInstance(0, 0);
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {

View File

@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -30,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -38,90 +34,37 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; /**
import com.google.common.collect.HashMultiset; * Represents an application attempt from the viewpoint of the Fair Scheduler.
import com.google.common.collect.Multiset; */
@Private @Private
@Unstable @Unstable
public class FSSchedulerApp extends SchedulerApplication { public class FSSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
private final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private final AppSchedulingInfo appSchedulingInfo;
private AppSchedulable appSchedulable; private AppSchedulable appSchedulable;
private final Queue queue;
private final Resource currentConsumption = recordFactory
.newRecordInstance(Resource.class);
private Resource resourceLimit = recordFactory
.newRecordInstance(Resource.class);
private Map<ContainerId, RMContainer> liveContainers
= new HashMap<ContainerId, RMContainer>();
private List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>(); final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
* asks the application for a task at this priority, it is incremented,
* and each time the application successfully schedules a task, it
* is reset to 0.
*/
Multiset<Priority> schedulingOpportunities = HashMultiset.create();
Multiset<Priority> reReservations = HashMultiset.create();
Resource currentReservation = recordFactory
.newRecordInstance(Resource.class);
private final RMContext rmContext;
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) { RMContext rmContext) {
this.rmContext = rmContext; super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
}
public ApplicationId getApplicationId() {
return appSchedulingInfo.getApplicationId();
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return appSchedulingInfo.getApplicationAttemptId();
} }
public void setAppSchedulable(AppSchedulable appSchedulable) { public void setAppSchedulable(AppSchedulable appSchedulable) {
@ -132,83 +75,6 @@ public class FSSchedulerApp extends SchedulerApplication {
return appSchedulable; return appSchedulable;
} }
public String getUser() {
return appSchedulingInfo.getUser();
}
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
return appSchedulingInfo.getResourceRequests(priority);
}
public int getNewContainerId() {
return appSchedulingInfo.getNewContainerId();
}
public Collection<Priority> getPriorities() {
return appSchedulingInfo.getPriorities();
}
public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
return appSchedulingInfo.getResourceRequest(priority, nodeAddress);
}
public synchronized int getTotalRequiredResources(Priority priority) {
return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
}
public Resource getResource(Priority priority) {
return appSchedulingInfo.getResource(priority);
}
/**
* Is this application pending?
* @return true if it is else false.
*/
@Override
public boolean isPending() {
return appSchedulingInfo.isPending();
}
public String getQueueName() {
return appSchedulingInfo.getQueueName();
}
/**
* Get the list of live containers
* @return All of the live containers
*/
@Override
public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information
appSchedulingInfo.stop(rmAppAttemptFinalState);
}
@SuppressWarnings("unchecked")
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// Inform the container
RMContainer rmContainer =
getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
}
synchronized public void containerCompleted(RMContainer rmContainer, synchronized public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) { ContainerStatus containerStatus, RMContainerEventType event) {
@ -241,122 +107,6 @@ public class FSSchedulerApp extends SchedulerApplication {
preemptionMap.remove(rmContainer); preemptionMap.remove(rmContainer);
} }
synchronized public List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
for (RMContainer rmContainer : newlyAllocatedContainers) {
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
returnContainerList.add(rmContainer.getContainer());
}
newlyAllocatedContainers.clear();
return returnContainerList;
}
public Resource getCurrentConsumption() {
return this.currentConsumption;
}
synchronized public void showRequests() {
if (LOG.isDebugEnabled()) {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId() +
" headRoom=" + getHeadroom() +
" currentConsumption=" + currentConsumption.getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
}
}
}
}
}
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
synchronized public void addSchedulingOpportunity(Priority priority) {
schedulingOpportunities.setCount(priority,
schedulingOpportunities.count(priority) + 1);
}
/**
* Return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it
* successfully did so.
*/
synchronized public int getSchedulingOpportunities(Priority priority) {
return schedulingOpportunities.count(priority);
}
synchronized void resetReReservations(Priority priority) {
reReservations.setCount(priority, 0);
}
synchronized void addReReservation(Priority priority) {
reReservations.add(priority);
}
synchronized public int getReReservations(Priority priority) {
return reReservations.count(priority);
}
public synchronized int getNumReservedContainers(Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
return (reservedContainers == null) ? 0 : reservedContainers.size();
}
/**
* Get total current reservations.
* Used only by unit tests
* @return total current reservations
*/
@VisibleForTesting
public synchronized Resource getCurrentReservation() {
return currentReservation;
}
public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority,
RMContainer rmContainer, Container container) {
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
rmContext.getContainerAllocationExpirer());
Resources.addTo(currentReservation, container.getResource());
// Reset the re-reservation count
resetReReservations(priority);
} else {
// Note down the re-reservation
addReReservation(priority);
}
rmContainer.handle(new RMContainerReservedEvent(container.getId(),
container.getResource(), node.getNodeID(), priority));
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers == null) {
reservedContainers = new HashMap<NodeId, RMContainer>();
this.reservedContainers.put(priority, reservedContainers);
}
reservedContainers.put(node.getNodeID(), rmContainer);
LOG.info("Application " + getApplicationId()
+ " reserved container " + rmContainer
+ " on node " + node + ", currently has " + reservedContainers.size()
+ " at priority " + priority
+ "; currentReservation " + currentReservation.getMemory());
return rmContainer;
}
public synchronized void unreserve(FSSchedulerNode node, Priority priority) { public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers = Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority); this.reservedContainers.get(priority);
@ -376,22 +126,6 @@ public class FSSchedulerApp extends SchedulerApplication {
+ priority + "; currentReservation " + currentReservation); + priority + "; currentReservation " + currentReservation);
} }
/**
* Has the application reserved the given <code>node</code> at the
* given <code>priority</code>?
* @param node node to be checked
* @param priority priority of reserved container
* @return true is reserved, false if not
*/
public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
if (reservedContainers != null) {
return reservedContainers.containsKey(node.getNodeID());
}
return false;
}
public synchronized float getLocalityWaitFactor( public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) { Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks) // Estimate: Required unique resources (i.e. hosts + racks)
@ -403,41 +137,6 @@ public class FSSchedulerApp extends SchedulerApplication {
return Math.min(((float)requiredResources / clusterNodes), 1.0f); return Math.min(((float)requiredResources / clusterNodes), 1.0f);
} }
/**
* Get the list of reserved containers
* @return All of the reserved containers.
*/
@Override
public synchronized List<RMContainer> getReservedContainers() {
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
reservedContainers.addAll(e.getValue().values());
}
return reservedContainers;
}
public synchronized void setHeadroom(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
/**
* Get available headroom in terms of resources for the application's user.
* @return available resource headroom
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemory() < 0) {
resourceLimit.setMemory(0);
}
return resourceLimit;
}
public Queue getQueue() {
return queue;
}
/** /**
* Delay scheduling: We often want to prioritize scheduling of node-local * Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To acheive this * containers over rack-local or off-switch containers. To acheive this
@ -453,26 +152,6 @@ public class FSSchedulerApp extends SchedulerApplication {
final Map<Priority, NodeType> allowedLocalityLevel = new HashMap< final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
Priority, NodeType>(); Priority, NodeType>();
// Time of the last container scheduled at the current allowed level
Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
/**
* Should be called when an application has successfully scheduled a container,
* or when the scheduling locality threshold is relaxed.
* Reset various internal counters which affect delay scheduling
*
* @param priority The priority of the container scheduled.
*/
synchronized public void resetSchedulingOpportunities(Priority priority) {
resetSchedulingOpportunities(priority, System.currentTimeMillis());
}
// used for continuous scheduling
synchronized public void resetSchedulingOpportunities(Priority priority,
long currentTimeMs) {
lastScheduledContainer.put(priority, currentTimeMs);
schedulingOpportunities.setCount(priority, 0);
}
/** /**
* Return the level at which we are allowed to schedule containers, given the * Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to * current size of the cluster and thresholds indicating how many nodes to