Merge r1293034 through r1293500 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1293501 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-25 01:09:01 +00:00
commit 0654e46791
23 changed files with 331 additions and 50 deletions

View File

@ -187,6 +187,11 @@ Release 0.23.2 - UNRELEASED
dfs.client.block.write.replace-datanode-on-failure.enable should be true. dfs.client.block.write.replace-datanode-on-failure.enable should be true.
(szetszwo) (szetszwo)
HDFS-3008. Negative caching of local addrs doesn't work. (eli)
HDFS-3006. In WebHDFS, when the return body is empty, set the Content-Type
to application/octet-stream instead of application/json. (szetszwo)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -542,11 +542,12 @@ public class DFSClient implements java.io.Closeable {
private static boolean isLocalAddress(InetSocketAddress targetAddr) { private static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress(); InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress()); Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null && cached) { if (cached != null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr + " is local"); LOG.trace("Address " + targetAddr +
(cached ? " is local" : " is not local"));
} }
return true; return cached;
} }
// Check if the address is any local or loop back // Check if the address is any local or loop back
@ -561,7 +562,8 @@ public class DFSClient implements java.io.Closeable {
} }
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr + " is local"); LOG.trace("Address " + targetAddr +
(local ? " is local" : " is not local"));
} }
localAddrMap.put(addr.getHostAddress(), local); localAddrMap.put(addr.getHostAddress(), local);
return local; return local;

View File

@ -117,7 +117,7 @@ public class DatanodeWebHdfsMethods {
@PUT @PUT
@Path("/") @Path("/")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response putRoot( public Response putRoot(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@ -147,7 +147,7 @@ public class DatanodeWebHdfsMethods {
@PUT @PUT
@Path("{" + UriFsPathParam.NAME + ":.*}") @Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response put( public Response put(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@ -209,7 +209,7 @@ public class DatanodeWebHdfsMethods {
final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf); final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf);
final URI uri = new URI(WebHdfsFileSystem.SCHEME, null, final URI uri = new URI(WebHdfsFileSystem.SCHEME, null,
nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null); nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null);
return Response.created(uri).type(MediaType.APPLICATION_JSON).build(); return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
@ -222,7 +222,7 @@ public class DatanodeWebHdfsMethods {
@POST @POST
@Path("/") @Path("/")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response postRoot( public Response postRoot(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@ -243,7 +243,7 @@ public class DatanodeWebHdfsMethods {
@POST @POST
@Path("{" + UriFsPathParam.NAME + ":.*}") @Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response post( public Response post(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@ -287,7 +287,7 @@ public class DatanodeWebHdfsMethods {
IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, out);
IOUtils.cleanup(LOG, dfsclient); IOUtils.cleanup(LOG, dfsclient);
} }
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");

View File

@ -215,7 +215,7 @@ public class NamenodeWebHdfsMethods {
@PUT @PUT
@Path("/") @Path("/")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response putRoot( public Response putRoot(
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@ -263,7 +263,7 @@ public class NamenodeWebHdfsMethods {
@PUT @PUT
@Path("{" + UriFsPathParam.NAME + ":.*}") @Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response put( public Response put(
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@ -324,7 +324,7 @@ public class NamenodeWebHdfsMethods {
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, fullpath, op.getValue(), -1L,
permission, overwrite, bufferSize, replication, blockSize); permission, overwrite, bufferSize, replication, blockSize);
return Response.temporaryRedirect(uri).build(); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case MKDIRS: case MKDIRS:
{ {
@ -336,7 +336,7 @@ public class NamenodeWebHdfsMethods {
{ {
np.createSymlink(destination.getValue(), fullpath, np.createSymlink(destination.getValue(), fullpath,
PermissionParam.getDefaultFsPermission(), createParent.getValue()); PermissionParam.getDefaultFsPermission(), createParent.getValue());
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case RENAME: case RENAME:
{ {
@ -348,7 +348,7 @@ public class NamenodeWebHdfsMethods {
} else { } else {
np.rename2(fullpath, destination.getValue(), np.rename2(fullpath, destination.getValue(),
s.toArray(new Options.Rename[s.size()])); s.toArray(new Options.Rename[s.size()]));
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
} }
case SETREPLICATION: case SETREPLICATION:
@ -364,17 +364,17 @@ public class NamenodeWebHdfsMethods {
} }
np.setOwner(fullpath, owner.getValue(), group.getValue()); np.setOwner(fullpath, owner.getValue(), group.getValue());
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case SETPERMISSION: case SETPERMISSION:
{ {
np.setPermission(fullpath, permission.getFsPermission()); np.setPermission(fullpath, permission.getFsPermission());
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case SETTIMES: case SETTIMES:
{ {
np.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue()); np.setTimes(fullpath, modificationTime.getValue(), accessTime.getValue());
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case RENEWDELEGATIONTOKEN: case RENEWDELEGATIONTOKEN:
{ {
@ -389,7 +389,7 @@ public class NamenodeWebHdfsMethods {
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegationTokenArgument.getValue()); token.decodeFromUrlString(delegationTokenArgument.getValue());
np.cancelDelegationToken(token); np.cancelDelegationToken(token);
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
@ -406,7 +406,7 @@ public class NamenodeWebHdfsMethods {
@POST @POST
@Path("/") @Path("/")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response postRoot( public Response postRoot(
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@ -427,7 +427,7 @@ public class NamenodeWebHdfsMethods {
@POST @POST
@Path("{" + UriFsPathParam.NAME + ":.*}") @Path("{" + UriFsPathParam.NAME + ":.*}")
@Consumes({"*/*"}) @Consumes({"*/*"})
@Produces({MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response post( public Response post(
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
@ -459,7 +459,7 @@ public class NamenodeWebHdfsMethods {
{ {
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, bufferSize); fullpath, op.getValue(), -1L, bufferSize);
return Response.temporaryRedirect(uri).build(); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
@ -542,7 +542,7 @@ public class NamenodeWebHdfsMethods {
{ {
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize); fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
return Response.temporaryRedirect(uri).build(); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case GET_BLOCK_LOCATIONS: case GET_BLOCK_LOCATIONS:
{ {
@ -578,7 +578,7 @@ public class NamenodeWebHdfsMethods {
{ {
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L); fullpath, op.getValue(), -1L);
return Response.temporaryRedirect(uri).build(); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
case GETDELEGATIONTOKEN: case GETDELEGATIONTOKEN:
{ {

View File

@ -27,6 +27,7 @@ import java.net.URL;
import java.util.Map; import java.util.Map;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
@ -314,6 +315,8 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
conn.setRequestMethod(op.getType().toString()); conn.setRequestMethod(op.getType().toString());
conn.connect(); conn.connect();
assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode()); assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
assertEquals(0, conn.getContentLength());
assertEquals(MediaType.APPLICATION_OCTET_STREAM, conn.getContentType());
assertEquals((short)0755, webhdfs.getFileStatus(dir).getPermission().toShort()); assertEquals((short)0755, webhdfs.getFileStatus(dir).getPermission().toShort());
conn.disconnect(); conn.disconnect();
} }

View File

@ -36,6 +36,7 @@ Release 0.23.2 - UNRELEASED
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-3849. Change TokenCache's reading of the binary token file MAPREDUCE-3849. Change TokenCache's reading of the binary token file
(Daryn Sharp via bobby) (Daryn Sharp via bobby)
@ -45,9 +46,17 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3877 Add a test to formalise the current state transitions MAPREDUCE-3877 Add a test to formalise the current state transitions
of the yarn lifecycle. (stevel) of the yarn lifecycle. (stevel)
MAPREDUCE-3866. Fixed the bin/yarn script to not print the command line
unnecessarily. (vinodkv)
MAPREDUCE-3730. Modified RM to allow restarted NMs to be able to join the
cluster without waiting for expiry. (Jason Lowe via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
MAPREDUCE-3918 proc_historyserver no longer in command line arguments for
HistoryServer (Jon Eagles via bobby)
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
DeletionService threads (Jason Lowe via bobby) DeletionService threads (Jason Lowe via bobby)
@ -74,6 +83,13 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles
via tgraves) via tgraves)
MAPREDUCE-3738. MM can hang during shutdown if AppLogAggregatorImpl thread
dies unexpectedly (Jason Lowe via sseth)
MAPREDUCE-3904 Job history produced with mapreduce.cluster.acls.enabled
false can not be viewed with mapreduce.cluster.acls.enabled true
(Jonathon Eagles via tgraves)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17

View File

@ -136,4 +136,4 @@ fi
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}" HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
export CLASSPATH export CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@" exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"

View File

@ -438,6 +438,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public boolean checkAccess(UserGroupInformation callerUGI, public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) { JobACL jobOperation) {
AccessControlList jobACL = jobACLs.get(jobOperation); AccessControlList jobACL = jobACLs.get(jobOperation);
if (jobACL == null) {
return true;
}
return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL); return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
} }

View File

@ -191,5 +191,16 @@ public class TestJobImpl {
null, null, null, true, null, 0, null); null, null, null, true, null, 0, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
// Setup configuration access without security enabled
Configuration conf5 = new Configuration();
conf5.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
conf5.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
null, null, null, true, null, 0, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
} }
} }

View File

@ -330,6 +330,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) { boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs(); Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
AccessControlList jobACL = jobACLs.get(jobOperation); AccessControlList jobACL = jobACLs.get(jobOperation);
if (jobACL == null) {
return true;
}
return aclsMgr.checkAccess(callerUGI, jobOperation, return aclsMgr.checkAccess(callerUGI, jobOperation,
jobInfo.getUsername(), jobACL); jobInfo.getUsername(), jobACL);
} }

View File

@ -221,6 +221,5 @@ if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
YARN_OPTS="$YARN_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH" YARN_OPTS="$YARN_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
fi fi
echo "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@" exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
fi fi

View File

@ -133,8 +133,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} }
@Override @Override
public void run() {
try {
doAppLogAggregation();
} finally {
this.appAggregationFinished.set(true);
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void run() { private void doAppLogAggregation() {
ContainerId containerId; ContainerId containerId;
while (!this.appFinishing.get()) { while (!this.appFinishing.get()) {
@ -189,8 +197,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId, new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
} }
private Path getRemoteNodeTmpLogFileForApp() { private Path getRemoteNodeTmpLogFileForApp() {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.assertTrue;
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@ -536,4 +538,31 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
appAcls.put(ApplicationAccessType.VIEW_APP, "*"); appAcls.put(ApplicationAccessType.VIEW_APP, "*");
return appAcls; return appAcls;
} }
@Test(timeout=20000)
@SuppressWarnings("unchecked")
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
logAggregationService.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.stop();
}
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@ -177,17 +178,17 @@ public class ResourceTrackerService extends AbstractService implements
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability); resolve(host), capability);
if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) { RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
LOG.info("Duplicate registration from the node at: " + host if (oldNode == null) {
+ ", Sending SHUTDOWN Signal to the NodeManager"); this.rmContext.getDispatcher().getEventHandler().handle(
regResponse.setNodeAction(NodeAction.SHUTDOWN); new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
response.setRegistrationResponse(regResponse); } else {
return response; LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeReconnectEvent(nodeId, rmNode));
} }
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
this.nmLivelinessMonitor.register(nodeId); this.nmLivelinessMonitor.register(nodeId);
LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort

View File

@ -28,6 +28,7 @@ public enum RMNodeEventType {
// ResourceTrackerService // ResourceTrackerService
STATUS_UPDATE, STATUS_UPDATE,
REBOOTING, REBOOTING,
RECONNECTED,
// Source: Application // Source: Application
CLEANUP_APP, CLEANUP_APP,

View File

@ -110,9 +110,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType, RMNodeEventType,
RMNodeEvent>(RMNodeState.NEW) RMNodeEvent>(RMNodeState.NEW)
//Transitions from RUNNING state //Transitions from NEW state
.addTransition(RMNodeState.NEW, RMNodeState.RUNNING, .addTransition(RMNodeState.NEW, RMNodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition()) RMNodeEventType.STARTED, new AddNodeTransition())
//Transitions from RUNNING state
.addTransition(RMNodeState.RUNNING, .addTransition(RMNodeState.RUNNING,
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY), EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@ -129,11 +131,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
//Transitions from UNHEALTHY state //Transitions from UNHEALTHY state
.addTransition(RMNodeState.UNHEALTHY, .addTransition(RMNodeState.UNHEALTHY,
EnumSet.of(RMNodeState.UNHEALTHY, RMNodeState.RUNNING), EnumSet.of(RMNodeState.UNHEALTHY, RMNodeState.RUNNING),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
@ -372,6 +378,39 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
public static class ReconnectNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Kill containers since node is rejoining.
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
&& rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted.
rmNode.getLastHeartBeatResponse().setResponseId(0);
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
} else {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
case RUNNING:
ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
}
}
}
public static class CleanUpAppTransition public static class CleanUpAppTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import org.apache.hadoop.yarn.api.records.NodeId;
public class RMNodeReconnectEvent extends RMNodeEvent {
private RMNode reconnectedNode;
public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
super(nodeId, RMNodeEventType.RECONNECTED);
reconnectedNode = newNode;
}
public RMNode getReconnectedNode() {
return reconnectedNode;
}
}

View File

@ -666,7 +666,10 @@ implements ResourceScheduler, CapacitySchedulerContext {
private synchronized void removeNode(RMNode nodeInfo) { private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); if (node == null) {
return;
}
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
root.updateClusterResource(clusterResource); root.updateClusterResource(clusterResource);
--numNodeManagers; --numNodeManagers;

View File

@ -731,6 +731,9 @@ public class FifoScheduler implements ResourceScheduler {
private synchronized void removeNode(RMNode nodeInfo) { private synchronized void removeNode(RMNode nodeInfo) {
SchedulerNode node = getNode(nodeInfo.getNodeID()); SchedulerNode node = getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
// Kill running containers // Kill running containers
for(RMContainer container : node.getRunningContainers()) { for(RMContainer container : node.getRunningContainers()) {
containerCompleted(container, containerCompleted(container,
@ -744,7 +747,7 @@ public class FifoScheduler implements ResourceScheduler {
this.nodes.remove(nodeInfo.getNodeID()); this.nodes.remove(nodeInfo.getNodeID());
// Update cluster metrics // Update cluster metrics
Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
} }
@Override @Override

View File

@ -19,23 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -195,8 +190,12 @@ public class MockNodes {
}; };
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) { private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++);
}
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr, int hostnum) {
final String rackName = "rack"+ rack; final String rackName = "rack"+ rack;
final int nid = NODE_ID++; final int nid = hostnum;
final String hostName = "host"+ nid; final String hostName = "host"+ nid;
final int port = 123; final int port = 123;
final NodeId nodeID = newNodeID(hostName, port); final NodeId nodeID = newNodeID(hostName, port);
@ -219,4 +218,8 @@ public class MockNodes {
public static RMNode newNodeInfo(int rack, final Resource perNode) { public static RMNode newNodeInfo(int rack, final Resource perNode) {
return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0"); return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0");
} }
public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum);
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
@ -27,10 +28,17 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -167,10 +175,37 @@ public class TestFifoScheduler {
testMinimumAllocation(conf); testMinimumAllocation(conf);
} }
@Test
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setQueues("default", new String[] {"default"});
conf.setCapacity("default", 100);
FifoScheduler fs = new FifoScheduler();
fs.reinitialize(conf, null, null);
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
fs.handle(new NodeAddedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n2));
List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
// reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
fs.handle(new NodeRemovedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n1));
fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestFifoScheduler t = new TestFifoScheduler(); TestFifoScheduler t = new TestFifoScheduler();
t.test(); t.test();
t.testDefaultMinimumAllocation(); t.testDefaultMinimumAllocation();
t.testNonDefaultMinimumAllocation(); t.testNonDefaultMinimumAllocation();
t.testReconnectedNode();
} }
} }

View File

@ -31,12 +31,17 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -189,7 +194,7 @@ public class TestResourceTrackerService {
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
.getAbsolutePath()); .getAbsolutePath());
MockRM rm = new MockRM(conf); rm = new MockRM(conf);
rm.start(); rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120); MockNM nm1 = rm.registerNode("host1:1234", 5120);
@ -223,6 +228,61 @@ public class TestResourceTrackerService {
ClusterMetrics.getMetrics().getUnhealthyNMs()); ClusterMetrics.getMetrics().getUnhealthyNMs());
} }
@Test
public void testReconnectNode() throws Exception {
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
}
};
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 5120);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
// reconnect of healthy node
nm1 = rm.registerNode("host1:1234", 5120);
HeartbeatResponse response = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
dispatcher.await();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
// reconnect of unhealthy node
nm2 = rm.registerNode("host2:5678", 5120);
response = nm2.nodeHeartbeat(false);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
dispatcher.await();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
// reconnect of node with changed capability
nm1 = rm.registerNode("host2:5678", 10240);
dispatcher.await();
response = nm2.nodeHeartbeat(true);
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
}
private void writeToHostsFile(String... hosts) throws IOException { private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) { if (!hostFile.exists()) {
TEMP_DIR.mkdirs(); TEMP_DIR.mkdirs();

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@ -41,12 +42,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestCapacityScheduler { public class TestCapacityScheduler {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
private final int GB = 1024;
private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
@ -97,8 +101,6 @@ public class TestCapacityScheduler {
LOG.info("--- START: testCapacityScheduler ---"); LOG.info("--- START: testCapacityScheduler ---");
final int GB = 1024;
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
@ -340,4 +342,27 @@ public class TestCapacityScheduler {
cs.reinitialize(conf, null, null); cs.reinitialize(conf, null, null);
} }
@Test
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
cs.reinitialize(csConf, null, null);
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
cs.handle(new NodeAddedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n2));
Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
// reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
cs.handle(new NodeRemovedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n1));
Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
}
} }