diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 27374b67577..93344e2a3ab 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -22,6 +22,8 @@ Release 2.5.0 - UNRELEASED NEW FEATURES IMPROVEMENTS + YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via + jeagles) OPTIMIZATIONS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 0c56134b811..8efceccaa90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -403,6 +403,15 @@ public class ApplicationMasterService extends AbstractService implements return resync; } + //filter illegal progress values + float filteredProgress = request.getProgress(); + if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY + || filteredProgress < 0) { + request.setProgress(0); + } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { + request.setProgress(1); + } + // Send the status update to the appAttempt. this.rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptStatusupdateEvent(appAttemptId, request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 2e65b0c80f5..3b8d69f7958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -197,6 +197,29 @@ public class MockAM { } } + public AllocateResponse allocate(AllocateRequest allocateRequest) + throws Exception { + final AllocateRequest req = allocateRequest; + req.setResponseId(++responseId); + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + context.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return amRMProtocol.allocate(req); + } + }); + } catch (UndeclaredThrowableException e) { + throw (Exception) e.getCause(); + } + } + public void unregisterAppAttempt() throws Exception { waitForState(RMAppAttemptState.RUNNING); final FinishApplicationMasterRequest req = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 4025ab76b52..76c999d8494 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,23 +18,58 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.collect.Maps; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; + +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import static java.lang.Thread.sleep; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Mockito.*; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -73,7 +108,7 @@ public class TestApplicationMasterService { nm1.nodeHeartbeat(true); while (alloc1Response.getAllocatedContainers().size() < 1) { LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(1000); + sleep(1000); alloc1Response = am1.schedule(); } @@ -113,7 +148,7 @@ public class TestApplicationMasterService { nm1.nodeHeartbeat(true); while (alloc1Response.getAllocatedContainers().size() < 1) { LOG.info("Waiting for containers to be created for app 1..."); - Thread.sleep(1000); + sleep(1000); alloc1Response = am1.schedule(); } @@ -145,4 +180,70 @@ public class TestApplicationMasterService { } } } + + @Test(timeout=1200000) + public void testProgressFilter() throws Exception{ + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + am1.setAMRMProtocol(rm.getApplicationMasterService()); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList(); + List ask = new ArrayList(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + allocateRequest.setProgress(Float.POSITIVE_INFINITY); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=1){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress(Float.NaN); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress((float)9); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=1){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress(Float.NEGATIVE_INFINITY); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress((float)0.5); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0.5){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + + allocateRequest.setProgress((float)-1); + am1.allocate(allocateRequest); + while(attempt1.getProgress()!=0){ + LOG.info("Waiting for allocate event to be handled ..."); + sleep(100); + } + } }