YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1569859 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2014-02-19 17:58:04 +00:00
parent 0bbc1f4d95
commit 07c3667c84
4 changed files with 140 additions and 5 deletions

View File

@ -7,6 +7,8 @@ Release 2.5.0 - UNRELEASED
NEW FEATURES
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
jeagles)
OPTIMIZATIONS

View File

@ -402,6 +402,15 @@ public class ApplicationMasterService extends AbstractService implements
return resync;
}
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
|| filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request

View File

@ -197,6 +197,29 @@ public class MockAM {
}
}
public AllocateResponse allocate(AllocateRequest allocateRequest)
throws Exception {
final AllocateRequest req = allocateRequest;
req.setResponseId(++responseId);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
context.getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
try {
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
return amRMProtocol.allocate(req);
}
});
} catch (UndeclaredThrowableException e) {
throw (Exception) e.getCause();
}
}
public void unregisterAppAttempt() throws Exception {
waitForState(RMAppAttemptState.RUNNING);
final FinishApplicationMasterRequest req =

View File

@ -18,23 +18,58 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.collect.Maps;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import static java.lang.Thread.sleep;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Mockito.*;
public class TestApplicationMasterService {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@ -73,7 +108,7 @@ public class TestApplicationMasterService {
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
sleep(1000);
alloc1Response = am1.schedule();
}
@ -113,7 +148,7 @@ public class TestApplicationMasterService {
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
sleep(1000);
alloc1Response = am1.schedule();
}
@ -145,4 +180,70 @@ public class TestApplicationMasterService {
}
}
}
@Test(timeout=1200000)
public void testProgressFilter() throws Exception{
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(2048);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
am1.setAMRMProtocol(rm.getApplicationMasterService());
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
allocateRequest.setReleaseList(release);
allocateRequest.setAskList(ask);
allocateRequest.setProgress(Float.POSITIVE_INFINITY);
am1.allocate(allocateRequest);
while(attempt1.getProgress()!=1){
LOG.info("Waiting for allocate event to be handled ...");
sleep(100);
}
allocateRequest.setProgress(Float.NaN);
am1.allocate(allocateRequest);
while(attempt1.getProgress()!=0){
LOG.info("Waiting for allocate event to be handled ...");
sleep(100);
}
allocateRequest.setProgress((float)9);
am1.allocate(allocateRequest);
while(attempt1.getProgress()!=1){
LOG.info("Waiting for allocate event to be handled ...");
sleep(100);
}
allocateRequest.setProgress(Float.NEGATIVE_INFINITY);
am1.allocate(allocateRequest);
while(attempt1.getProgress()!=0){
LOG.info("Waiting for allocate event to be handled ...");
sleep(100);
}
allocateRequest.setProgress((float)0.5);
am1.allocate(allocateRequest);
while(attempt1.getProgress()!=0.5){
LOG.info("Waiting for allocate event to be handled ...");
sleep(100);
}
allocateRequest.setProgress((float)-1);
am1.allocate(allocateRequest);
while(attempt1.getProgress()!=0){
LOG.info("Waiting for allocate event to be handled ...");
sleep(100);
}
}
}