YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and made related MR changes. Contributed by Jian He.

(cherry picked from commit 0f3b6900be)
This commit is contained in:
Zhijie Shen 2014-10-23 21:56:03 -07:00
parent 193724ccaf
commit 3b03ea6b50
23 changed files with 196 additions and 215 deletions

View File

@ -36,12 +36,13 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -98,11 +99,24 @@ public class LocalContainerAllocator extends RMCommunicator
AllocateRequest.newInstance(this.lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>(), null);
AllocateResponse allocateResponse;
try {
allocateResponse = scheduler.allocate(allocateRequest);
scheduler.allocate(allocateRequest);
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (ApplicationAttemptNotFoundException e) {
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException(
"Resource Manager doesn't recognize AttemptId: "
+ this.getContext().getApplicationID(), e);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resync and send outstanding requests.");
this.lastResponseID = 0;
register();
} catch (Exception e) {
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
@ -117,29 +131,6 @@ public class LocalContainerAllocator extends RMCommunicator
// continue to attempt to contact the RM.
throw e;
}
if (allocateResponse.getAMCommand() != null) {
switch(allocateResponse.getAMCommand()) {
case AM_RESYNC:
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
this.lastResponseID = 0;
register();
break;
case AM_SHUTDOWN:
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
default:
String msg =
"Unhandled value of AMCommand: " + allocateResponse.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
}
@SuppressWarnings("unchecked")

View File

@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -240,7 +242,7 @@ public class RMContainerAllocator extends RMContainerRequestor
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers.size() > 0) {
if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}
@ -665,6 +667,22 @@ public class RMContainerAllocator extends RMContainerRequestor
response = makeRemoteRequest();
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (ApplicationAttemptNotFoundException e ) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException(
"Resource Manager doesn't recognize AttemptId: "
+ this.getContext().getApplicationID(), e);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resync and send outstanding requests.");
// RM may have restarted, re-register with RM.
lastResponseID = 0;
register();
addOutstandingRequestOnResync();
return null;
} catch (Exception e) {
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
@ -679,32 +697,6 @@ public class RMContainerAllocator extends RMContainerRequestor
// continue to attempt to contact the RM.
throw e;
}
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_RESYNC:
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
lastResponseID = 0;
// Registering to allow RM to discover an active AM for this
// application
register();
addOutstandingRequestOnResync();
break;
case AM_SHUTDOWN:
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
default:
String msg =
"Unhandled value of AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
Resource newHeadRoom =
getAvailableResources() == null ? Resources.none()
: getAvailableResources();

View File

@ -29,7 +29,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -40,7 +39,6 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -51,6 +49,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import com.google.common.annotations.VisibleForTesting;
/**
* Keeps the data structures to send container requests to RM.
@ -176,7 +176,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AllocateResponse makeRemoteRequest() throws IOException {
protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
new ArrayList<String>(blacklistRemovals));
@ -184,16 +185,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(release), blacklistRequest);
AllocateResponse allocateResponse;
try {
allocateResponse = scheduler.allocate(allocateRequest);
} catch (YarnException e) {
throw new IOException(e);
}
if (isResyncCommand(allocateResponse)) {
return allocateResponse;
}
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
@ -222,11 +214,6 @@ public abstract class RMContainerRequestor extends RMCommunicator {
return allocateResponse;
}
protected boolean isResyncCommand(AllocateResponse allocateResponse) {
return allocateResponse.getAMCommand() != null
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
}
protected void addOutstandingRequestOnResync() {
for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
.values()) {

View File

@ -31,15 +31,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -50,6 +52,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@ -86,9 +91,11 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -102,10 +109,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -1749,14 +1758,11 @@ public class TestRMContainerAllocator {
}
@Override
protected AllocateResponse makeRemoteRequest() throws IOException {
protected AllocateResponse makeRemoteRequest() throws IOException,
YarnException {
allocateResponse = super.makeRemoteRequest();
return allocateResponse;
}
public boolean isResyncCommand() {
return super.isResyncCommand(allocateResponse);
}
}
@Test
@ -2250,8 +2256,6 @@ public class TestRMContainerAllocator {
// send allocate request to 2nd RM and get resync command
allocator.schedule();
dispatcher.await();
Assert.assertTrue("Last allocate response is not RESYNC",
allocator.isResyncCommand());
// Step-5 : On Resync,AM sends all outstanding
// asks,release,blacklistAaddition

View File

@ -358,6 +358,9 @@ Release 2.6.0 - UNRELEASED
YARN-2709. Made timeline client getDelegationToken API retry if ConnectException
happens. (Li Lu via zjshen)
YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and
made related MR changes. (Jian He via zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
/**
* Command sent by the Resource Manager to the Application Master in the
@ -30,16 +34,26 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@Public
@Unstable
public enum AMCommand {
/**
* Sent by Resource Manager when it is out of sync with the AM and wants the
* AM get back in sync.
* @deprecated Sent by Resource Manager when it is out of sync with the AM and
* wants the AM get back in sync.
*
* Note: Instead of sending this command,
* {@link ApplicationMasterNotRegisteredException} will be thrown
* when ApplicationMaster is out of sync with ResourceManager and
* ApplicationMaster is expected to re-register with RM by calling
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
*/
AM_RESYNC,
/**
* Sent by Resource Manager when it wants the AM to shutdown. Eg. when the
* node is going down for maintenance. The AM should save any state and
* prepare to be restarted at a later time.
* @deprecated Sent by Resource Manager when it wants the AM to shutdown.
* Note: This command was earlier sent by ResourceManager to
* instruct AM to shutdown if RM had restarted. Now
* {@link ApplicationAttemptNotFoundException} will be thrown in case
* that RM has restarted and AM is supposed to handle this
* exception by shutting down itself.
*/
AM_SHUTDOWN
}

View File

@ -21,12 +21,16 @@ package org.apache.hadoop.yarn.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
/**
* This exception is thrown on
* {@link ApplicationHistoryProtocol#getApplicationAttemptReport (GetApplicationAttemptReportRequest)}
* API when the Application Attempt doesn't exist in Application History Server
* API when the Application Attempt doesn't exist in Application History Server or
* {@link ApplicationMasterProtocol#allocate(AllocateRequest)} if application
* doesn't exist in RM.
*/
@Public
@Unstable

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -222,6 +222,10 @@ extends AMRMClientAsync<T> {
try {
response = client.allocate(progress);
} catch (ApplicationAttemptNotFoundException e) {
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
return;
} catch (Throwable ex) {
LOG.error("Exception on heartbeat", ex);
savedException = ex;
@ -229,21 +233,17 @@ extends AMRMClientAsync<T> {
handlerThread.interrupt();
return;
}
}
if (response != null) {
while (true) {
try {
responseQueue.put(response);
if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
return;
if (response != null) {
while (true) {
try {
responseQueue.put(response);
break;
} catch (InterruptedException ex) {
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
break;
} catch (InterruptedException ex) {
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
}
}
try {
Thread.sleep(heartbeatIntervalMs.get());
} catch (InterruptedException ex) {
@ -276,20 +276,6 @@ extends AMRMClientAsync<T> {
LOG.info("Interrupted while waiting for queue", ex);
continue;
}
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
return;
default:
String msg =
"Unhandled value of RM AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
handler.onNodesUpdated(updatedNodes);

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -275,15 +274,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
blacklistRemovals.clear();
}
allocateResponse = rmClient.allocate(allocateRequest);
if (isResyncCommand(allocateResponse)) {
try {
allocateResponse = rmClient.allocate(allocateRequest);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
.values()) {
.values()) {
for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
for (ResourceRequestInfo request : capabalities.values()) {
addResourceRequestToAsk(request.remoteRequest);
@ -293,7 +293,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
// re register with RM
registerApplicationMaster();
return allocate(progressIndicator);
allocateResponse = allocate(progressIndicator);
return allocateResponse;
}
synchronized (this) {
@ -349,11 +350,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
private boolean isResyncCommand(AllocateResponse allocateResponse) {
return allocateResponse.getAMCommand() != null
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
}
@Private
@VisibleForTesting
protected void populateNMTokens(List<NMToken> nmTokens) {

View File

@ -827,7 +827,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
return AllocateResponse.newInstance(-1,
new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(),
Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1,
Resource.newInstance(1024, 2), null, 1,
null, new ArrayList<NMToken>());
}
}

View File

@ -18,16 +18,15 @@
package org.apache.hadoop.yarn.client.api.async.impl;
import com.google.common.base.Supplier;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -36,13 +35,10 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -56,12 +52,16 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
public class TestAMRMClientAsync {
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
@ -211,10 +211,10 @@ public class TestAMRMClientAsync {
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse shutDownResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
createAllocateResponse(new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), null);
when(client.allocate(anyFloat())).thenThrow(
new ApplicationAttemptNotFoundException("app not found, shut down"));
AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
@ -235,11 +235,8 @@ public class TestAMRMClientAsync {
final TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse shutDownResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
when(client.allocate(anyFloat())).thenThrow(
new ApplicationAttemptNotFoundException("app not found, shut down"));
AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);

View File

@ -932,7 +932,7 @@ public class TestAMRMClient {
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
// can do the allocate call with latest AMRMToken
amClient.allocate(0.1f);
AllocateResponse response = amClient.allocate(0.1f);
// Verify latest AMRMToken can be used to send allocation request.
UserGroupInformation testUser1 =
@ -953,7 +953,8 @@ public class TestAMRMClient {
.getResourceManager().getApplicationMasterService().getBindAddress());
testUser1.addToken(newVersionToken);
AllocateRequest request = Records.newRecord(AllocateRequest.class);
request.setResponseId(response.getResponseId());
testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() {
@ -962,7 +963,7 @@ public class TestAMRMClient {
yarnCluster.getResourceManager().getApplicationMasterService()
.getBindAddress(), conf);
}
}).allocate(Records.newRecord(AllocateRequest.class));
}).allocate(request);
// Make sure previous token has been rolled-over
// and can not use this rolled-over token to make a allocate all.

View File

@ -51,11 +51,10 @@ import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@Private
@Unstable
public class ProtoUtils {

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.util.resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
@ -30,14 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
@Unstable
public abstract class ResourceCalculator {
private static final Log LOG = LogFactory.getLog(ResourceCalculator.class);
public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs);
public static int divideAndCeil(int a, int b) {
if (b == 0) {
LOG.info("divideAndCeil called with a=" + a + " b=" + b);
return 0;
}
return (a + (b - 1)) / b;

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.api;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.AMCommand;
@ -34,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.junit.Assert;
import org.junit.Test;
/**
@ -52,8 +51,8 @@ import org.junit.Test;
* License for the specific language governing permissions and limitations under
* the License.
*/
public class TestAllocateResponse {
@SuppressWarnings("deprecation")
@Test
public void testAllocateResponseWithIncDecContainers() {
List<ContainerResourceIncrease> incContainers =
@ -96,6 +95,7 @@ public class TestAllocateResponse {
}
}
@SuppressWarnings("deprecation")
@Test
public void testAllocateResponseWithoutIncDecContainers() {
AllocateResponse r =

View File

@ -22,7 +22,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -46,7 +50,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -63,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
@ -106,18 +110,12 @@ public class ApplicationMasterService extends AbstractService implements
RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final AllocateResponse shutdown =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
this.resync.setAMCommand(AMCommand.AM_RESYNC);
this.rmContext = rmContext;
}
@ -429,36 +427,35 @@ public class ApplicationMasterService extends AbstractService implements
/* check if its in cache */
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return shutdown;
String message =
"Application attempt " + appAttemptId
+ " doesn't exist in ApplicationMasterService cache.";
LOG.error(message);
throw new ApplicationAttemptNotFoundException(message);
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"Application Master is not registered for known application: "
+ applicationId
+ ". Let AM resync.";
"AM is not registered for known application attempt: " + appAttemptId
+ " or RM had restarted after AM registered . AM should re-register.";
LOG.info(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(applicationId)
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message,
applicationId,
appAttemptId);
return resync;
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.AM_ALLOCATE, "",
"ApplicationMasterService", message, applicationId, appAttemptId);
throw new ApplicationMasterNotRegisteredException(message);
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register
// and
// get an exception. Might as well throw an exception here.
return resync;
String message =
"Invalid responseId in AllocateRequest from application attempt: "
+ appAttemptId + ", expect responseId to be "
+ (lastResponse.getResponseId() + 1);
throw new InvalidApplicationMasterRequestException(message);
}
//filter illegal progress values

View File

@ -50,6 +50,7 @@ public class RMAuditLogger {
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
public static final String REGISTER_AM = "Register App Master";
public static final String AM_ALLOCATE = "App Master Heartbeats";
public static final String UNREGISTER_AM = "Unregister App Master";
public static final String ALLOC_CONTAINER = "AM Allocated Container";
public static final String RELEASE_CONTAINER = "AM Released Container";

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -44,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -195,29 +196,33 @@ public class TestApplicationMasterLauncher {
// request for containers
int request = 2;
AllocateResponse ar =
am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
AllocateResponse ar = null;
try {
ar = am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
Assert.fail();
} catch (ApplicationMasterNotRegisteredException e) {
}
// kick the scheduler
nm1.nodeHeartbeat(true);
AllocateResponse amrs =
am.allocate(new ArrayList<ResourceRequest>(),
AllocateResponse amrs = null;
try {
amrs = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
Assert.fail();
} catch (ApplicationMasterNotRegisteredException e) {
}
am.registerAppAttempt();
thrown = false;
try {
am.registerAppAttempt(false);
}
catch (Exception e) {
am.registerAppAttempt(false);
Assert.fail();
} catch (Exception e) {
Assert.assertEquals("Application Master is already registered : "
+ attempt.getAppAttemptId().getApplicationId(),
e.getMessage());
thrown = true;
}
Assert.assertTrue(thrown);
// Simulate an AM that was disconnected and app attempt was removed
// (responseMap does not contain attemptid)
@ -226,9 +231,11 @@ public class TestApplicationMasterLauncher {
ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
AllocateResponse amrs2 =
am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
try {
amrs = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.fail();
} catch (ApplicationAttemptNotFoundException e) {
}
}
}

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -78,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -316,10 +315,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// verify old AM is not accepted
// change running AM to talk to new RM
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
try {
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
Assert.fail();
} catch (ApplicationAttemptNotFoundException e) {
Assert.assertTrue(e instanceof ApplicationAttemptNotFoundException);
}
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@ -1749,8 +1751,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());

View File

@ -20,13 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction;
import org.junit.Assert;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -35,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -107,7 +106,12 @@ public class TestAMRMRPCResponseId {
/** try sending old request again **/
allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null);
response = allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
try {
allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof InvalidApplicationMasterRequestException);
}
}
}

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -206,6 +207,7 @@ public class TestAMRMTokens {
*
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Test
public void testMasterKeyRollOver() throws Exception {