YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and made related MR changes. Contributed by Jian He.
(cherry picked from commit 0f3b6900be
)
Conflicts:
hadoop-yarn-project/CHANGES.txt
This commit is contained in:
parent
82ac5c901e
commit
09e422302f
|
@ -36,12 +36,13 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
@ -98,11 +99,24 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
AllocateRequest.newInstance(this.lastResponseID,
|
||||
super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>(), null);
|
||||
AllocateResponse allocateResponse;
|
||||
try {
|
||||
allocateResponse = scheduler.allocate(allocateRequest);
|
||||
scheduler.allocate(allocateRequest);
|
||||
// Reset retry count if no exception occurred.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
} catch (ApplicationAttemptNotFoundException e) {
|
||||
LOG.info("Event from RM: shutting down Application Master");
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException(
|
||||
"Resource Manager doesn't recognize AttemptId: "
|
||||
+ this.getContext().getApplicationID(), e);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resync and send outstanding requests.");
|
||||
this.lastResponseID = 0;
|
||||
register();
|
||||
} catch (Exception e) {
|
||||
// This can happen when the connection to the RM has gone down. Keep
|
||||
// re-trying until the retryInterval has expired.
|
||||
|
@ -117,29 +131,6 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
if (allocateResponse.getAMCommand() != null) {
|
||||
switch(allocateResponse.getAMCommand()) {
|
||||
case AM_RESYNC:
|
||||
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resyncing.");
|
||||
this.lastResponseID = 0;
|
||||
register();
|
||||
break;
|
||||
case AM_SHUTDOWN:
|
||||
LOG.info("Event from RM: shutting down Application Master");
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
default:
|
||||
String msg =
|
||||
"Unhandled value of AMCommand: " + allocateResponse.getAMCommand();
|
||||
LOG.error(msg);
|
||||
throw new YarnRuntimeException(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
|
@ -240,7 +242,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
protected synchronized void heartbeat() throws Exception {
|
||||
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
|
||||
List<Container> allocatedContainers = getResources();
|
||||
if (allocatedContainers.size() > 0) {
|
||||
if (allocatedContainers != null && allocatedContainers.size() > 0) {
|
||||
scheduledRequests.assign(allocatedContainers);
|
||||
}
|
||||
|
||||
|
@ -665,6 +667,22 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
response = makeRemoteRequest();
|
||||
// Reset retry count if no exception occurred.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
} catch (ApplicationAttemptNotFoundException e ) {
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException(
|
||||
"Resource Manager doesn't recognize AttemptId: "
|
||||
+ this.getContext().getApplicationID(), e);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resync and send outstanding requests.");
|
||||
// RM may have restarted, re-register with RM.
|
||||
lastResponseID = 0;
|
||||
register();
|
||||
addOutstandingRequestOnResync();
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
// This can happen when the connection to the RM has gone down. Keep
|
||||
// re-trying until the retryInterval has expired.
|
||||
|
@ -679,32 +697,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
if (response.getAMCommand() != null) {
|
||||
switch(response.getAMCommand()) {
|
||||
case AM_RESYNC:
|
||||
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resyncing.");
|
||||
lastResponseID = 0;
|
||||
|
||||
// Registering to allow RM to discover an active AM for this
|
||||
// application
|
||||
register();
|
||||
addOutstandingRequestOnResync();
|
||||
break;
|
||||
case AM_SHUTDOWN:
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
default:
|
||||
String msg =
|
||||
"Unhandled value of AMCommand: " + response.getAMCommand();
|
||||
LOG.error(msg);
|
||||
throw new YarnRuntimeException(msg);
|
||||
}
|
||||
}
|
||||
Resource newHeadRoom =
|
||||
getAvailableResources() == null ? Resources.none()
|
||||
: getAvailableResources();
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.TreeSet;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -40,7 +39,6 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -51,6 +49,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
||||
/**
|
||||
* Keeps the data structures to send container requests to RM.
|
||||
|
@ -176,7 +176,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
|
||||
}
|
||||
|
||||
protected AllocateResponse makeRemoteRequest() throws IOException {
|
||||
protected AllocateResponse makeRemoteRequest() throws YarnException,
|
||||
IOException {
|
||||
ResourceBlacklistRequest blacklistRequest =
|
||||
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
|
||||
new ArrayList<String>(blacklistRemovals));
|
||||
|
@ -184,16 +185,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
AllocateRequest.newInstance(lastResponseID,
|
||||
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
|
||||
new ArrayList<ContainerId>(release), blacklistRequest);
|
||||
AllocateResponse allocateResponse;
|
||||
try {
|
||||
allocateResponse = scheduler.allocate(allocateRequest);
|
||||
} catch (YarnException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (isResyncCommand(allocateResponse)) {
|
||||
return allocateResponse;
|
||||
}
|
||||
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
|
||||
lastResponseID = allocateResponse.getResponseId();
|
||||
availableResources = allocateResponse.getAvailableResources();
|
||||
lastClusterNmCount = clusterNmCount;
|
||||
|
@ -222,11 +214,6 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
return allocateResponse;
|
||||
}
|
||||
|
||||
protected boolean isResyncCommand(AllocateResponse allocateResponse) {
|
||||
return allocateResponse.getAMCommand() != null
|
||||
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
|
||||
}
|
||||
|
||||
protected void addOutstandingRequestOnResync() {
|
||||
for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
|
||||
.values()) {
|
||||
|
|
|
@ -31,15 +31,17 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -50,6 +52,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
|
@ -86,9 +91,11 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
@ -102,10 +109,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -1749,14 +1758,11 @@ public class TestRMContainerAllocator {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected AllocateResponse makeRemoteRequest() throws IOException {
|
||||
protected AllocateResponse makeRemoteRequest() throws IOException,
|
||||
YarnException {
|
||||
allocateResponse = super.makeRemoteRequest();
|
||||
return allocateResponse;
|
||||
}
|
||||
|
||||
public boolean isResyncCommand() {
|
||||
return super.isResyncCommand(allocateResponse);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2250,8 +2256,6 @@ public class TestRMContainerAllocator {
|
|||
// send allocate request to 2nd RM and get resync command
|
||||
allocator.schedule();
|
||||
dispatcher.await();
|
||||
Assert.assertTrue("Last allocate response is not RESYNC",
|
||||
allocator.isResyncCommand());
|
||||
|
||||
// Step-5 : On Resync,AM sends all outstanding
|
||||
// asks,release,blacklistAaddition
|
||||
|
|
|
@ -328,6 +328,9 @@ Release 2.6.0 - UNRELEASED
|
|||
DefaultContainerExecutor#getFirstApplicationDir and use getWorkingDir()
|
||||
instead. (Zhihai Xu via jianhe)
|
||||
|
||||
YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and
|
||||
made related MR changes. (Jian He via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -86,7 +86,7 @@ public abstract class AllocateResponse {
|
|||
response.setNMTokens(nmTokens);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public static AllocateResponse newInstance(int responseId,
|
||||
|
|
|
@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.api.records;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
|
||||
/**
|
||||
* Command sent by the Resource Manager to the Application Master in the
|
||||
|
@ -30,16 +34,26 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|||
@Public
|
||||
@Unstable
|
||||
public enum AMCommand {
|
||||
|
||||
/**
|
||||
* Sent by Resource Manager when it is out of sync with the AM and wants the
|
||||
* AM get back in sync.
|
||||
* @deprecated Sent by Resource Manager when it is out of sync with the AM and
|
||||
* wants the AM get back in sync.
|
||||
*
|
||||
* Note: Instead of sending this command,
|
||||
* {@link ApplicationMasterNotRegisteredException} will be thrown
|
||||
* when ApplicationMaster is out of sync with ResourceManager and
|
||||
* ApplicationMaster is expected to re-register with RM by calling
|
||||
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
|
||||
*/
|
||||
AM_RESYNC,
|
||||
|
||||
|
||||
/**
|
||||
* Sent by Resource Manager when it wants the AM to shutdown. Eg. when the
|
||||
* node is going down for maintenance. The AM should save any state and
|
||||
* prepare to be restarted at a later time.
|
||||
* @deprecated Sent by Resource Manager when it wants the AM to shutdown.
|
||||
* Note: This command was earlier sent by ResourceManager to
|
||||
* instruct AM to shutdown if RM had restarted. Now
|
||||
* {@link ApplicationAttemptNotFoundException} will be thrown in case
|
||||
* that RM has restarted and AM is supposed to handle this
|
||||
* exception by shutting down itself.
|
||||
*/
|
||||
AM_SHUTDOWN
|
||||
}
|
||||
|
|
|
@ -21,12 +21,16 @@ package org.apache.hadoop.yarn.exceptions;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
|
||||
/**
|
||||
* This exception is thrown on
|
||||
* {@link ApplicationHistoryProtocol#getApplicationAttemptReport (GetApplicationAttemptReportRequest)}
|
||||
* API when the Application Attempt doesn't exist in Application History Server
|
||||
* API when the Application Attempt doesn't exist in Application History Server or
|
||||
* {@link ApplicationMasterProtocol#allocate(AllocateRequest)} if application
|
||||
* doesn't exist in RM.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
|
@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
|
@ -219,9 +219,13 @@ extends AMRMClientAsync<T> {
|
|||
if (!keepRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
response = client.allocate(progress);
|
||||
} catch (ApplicationAttemptNotFoundException e) {
|
||||
handler.onShutdownRequest();
|
||||
LOG.info("Shutdown requested. Stopping callback.");
|
||||
return;
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Exception on heartbeat", ex);
|
||||
savedException = ex;
|
||||
|
@ -229,21 +233,17 @@ extends AMRMClientAsync<T> {
|
|||
handlerThread.interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (response != null) {
|
||||
while (true) {
|
||||
try {
|
||||
responseQueue.put(response);
|
||||
if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
|
||||
return;
|
||||
if (response != null) {
|
||||
while (true) {
|
||||
try {
|
||||
responseQueue.put(response);
|
||||
break;
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.debug("Interrupted while waiting to put on response queue", ex);
|
||||
}
|
||||
break;
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.debug("Interrupted while waiting to put on response queue", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(heartbeatIntervalMs.get());
|
||||
} catch (InterruptedException ex) {
|
||||
|
@ -276,20 +276,6 @@ extends AMRMClientAsync<T> {
|
|||
LOG.info("Interrupted while waiting for queue", ex);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (response.getAMCommand() != null) {
|
||||
switch(response.getAMCommand()) {
|
||||
case AM_SHUTDOWN:
|
||||
handler.onShutdownRequest();
|
||||
LOG.info("Shutdown requested. Stopping callback.");
|
||||
return;
|
||||
default:
|
||||
String msg =
|
||||
"Unhandled value of RM AMCommand: " + response.getAMCommand();
|
||||
LOG.error(msg);
|
||||
throw new YarnRuntimeException(msg);
|
||||
}
|
||||
}
|
||||
List<NodeReport> updatedNodes = response.getUpdatedNodes();
|
||||
if (!updatedNodes.isEmpty()) {
|
||||
handler.onNodesUpdated(updatedNodes);
|
||||
|
|
|
@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -275,15 +274,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
blacklistRemovals.clear();
|
||||
}
|
||||
|
||||
allocateResponse = rmClient.allocate(allocateRequest);
|
||||
if (isResyncCommand(allocateResponse)) {
|
||||
try {
|
||||
allocateResponse = rmClient.allocate(allocateRequest);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resyncing.");
|
||||
synchronized (this) {
|
||||
release.addAll(this.pendingRelease);
|
||||
blacklistAdditions.addAll(this.blacklistedNodes);
|
||||
for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
|
||||
.values()) {
|
||||
.values()) {
|
||||
for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
|
||||
for (ResourceRequestInfo request : capabalities.values()) {
|
||||
addResourceRequestToAsk(request.remoteRequest);
|
||||
|
@ -293,7 +293,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
// re register with RM
|
||||
registerApplicationMaster();
|
||||
return allocate(progressIndicator);
|
||||
allocateResponse = allocate(progressIndicator);
|
||||
return allocateResponse;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
|
@ -349,11 +350,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isResyncCommand(AllocateResponse allocateResponse) {
|
||||
return allocateResponse.getAMCommand() != null
|
||||
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected void populateNMTokens(List<NMToken> nmTokens) {
|
||||
|
|
|
@ -827,7 +827,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
|||
return AllocateResponse.newInstance(-1,
|
||||
new ArrayList<ContainerStatus>(),
|
||||
new ArrayList<Container>(), new ArrayList<NodeReport>(),
|
||||
Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1,
|
||||
Resource.newInstance(1024, 2), null, 1,
|
||||
null, new ArrayList<NMToken>());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,16 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.yarn.client.api.async.impl;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyFloat;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -36,13 +35,10 @@ import java.util.List;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -56,12 +52,16 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
public class TestAMRMClientAsync {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
|
||||
|
@ -211,10 +211,10 @@ public class TestAMRMClientAsync {
|
|||
@SuppressWarnings("unchecked")
|
||||
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||
|
||||
final AllocateResponse shutDownResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
|
||||
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
|
||||
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
|
||||
createAllocateResponse(new ArrayList<ContainerStatus>(),
|
||||
new ArrayList<Container>(), null);
|
||||
when(client.allocate(anyFloat())).thenThrow(
|
||||
new ApplicationAttemptNotFoundException("app not found, shut down"));
|
||||
|
||||
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
|
||||
|
@ -235,11 +235,8 @@ public class TestAMRMClientAsync {
|
|||
final TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||
@SuppressWarnings("unchecked")
|
||||
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||
|
||||
final AllocateResponse shutDownResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
|
||||
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
|
||||
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
|
||||
when(client.allocate(anyFloat())).thenThrow(
|
||||
new ApplicationAttemptNotFoundException("app not found, shut down"));
|
||||
|
||||
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
|
||||
|
|
|
@ -932,7 +932,7 @@ public class TestAMRMClient {
|
|||
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
|
||||
|
||||
// can do the allocate call with latest AMRMToken
|
||||
amClient.allocate(0.1f);
|
||||
AllocateResponse response = amClient.allocate(0.1f);
|
||||
|
||||
// Verify latest AMRMToken can be used to send allocation request.
|
||||
UserGroupInformation testUser1 =
|
||||
|
@ -953,7 +953,8 @@ public class TestAMRMClient {
|
|||
.getResourceManager().getApplicationMasterService().getBindAddress());
|
||||
testUser1.addToken(newVersionToken);
|
||||
|
||||
|
||||
AllocateRequest request = Records.newRecord(AllocateRequest.class);
|
||||
request.setResponseId(response.getResponseId());
|
||||
testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() {
|
||||
|
@ -962,7 +963,7 @@ public class TestAMRMClient {
|
|||
yarnCluster.getResourceManager().getApplicationMasterService()
|
||||
.getBindAddress(), conf);
|
||||
}
|
||||
}).allocate(Records.newRecord(AllocateRequest.class));
|
||||
}).allocate(request);
|
||||
|
||||
// Make sure previous token has been rolled-over
|
||||
// and can not use this rolled-over token to make a allocate all.
|
||||
|
|
|
@ -51,11 +51,10 @@ import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class ProtoUtils {
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.util.resource;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -30,14 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
@Unstable
|
||||
public abstract class ResourceCalculator {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ResourceCalculator.class);
|
||||
|
||||
public abstract int
|
||||
compare(Resource clusterResource, Resource lhs, Resource rhs);
|
||||
|
||||
public static int divideAndCeil(int a, int b) {
|
||||
if (b == 0) {
|
||||
LOG.info("divideAndCeil called with a=" + a + " b=" + b);
|
||||
return 0;
|
||||
}
|
||||
return (a + (b - 1)) / b;
|
||||
|
|
|
@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.api;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
|
@ -34,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -52,8 +51,8 @@ import org.junit.Test;
|
|||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
public class TestAllocateResponse {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testAllocateResponseWithIncDecContainers() {
|
||||
List<ContainerResourceIncrease> incContainers =
|
||||
|
@ -96,6 +95,7 @@ public class TestAllocateResponse {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testAllocateResponseWithoutIncDecContainers() {
|
||||
AllocateResponse r =
|
||||
|
|
|
@ -399,7 +399,7 @@ public class BuilderUtils {
|
|||
url.setFile(file);
|
||||
return url;
|
||||
}
|
||||
|
||||
|
||||
public static AllocateResponse newAllocateResponse(int responseId,
|
||||
List<ContainerStatus> completedContainers,
|
||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||
|
|
|
@ -22,7 +22,11 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -46,7 +50,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
@ -63,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
|
@ -106,18 +110,12 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
|
||||
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
||||
private final AllocateResponse resync =
|
||||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
private final AllocateResponse shutdown =
|
||||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
private final RMContext rmContext;
|
||||
|
||||
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
|
||||
super(ApplicationMasterService.class.getName());
|
||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||
this.rScheduler = scheduler;
|
||||
this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
|
||||
this.resync.setAMCommand(AMCommand.AM_RESYNC);
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
||||
|
@ -429,36 +427,35 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
/* check if its in cache */
|
||||
AllocateResponseLock lock = responseMap.get(appAttemptId);
|
||||
if (lock == null) {
|
||||
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
|
||||
return shutdown;
|
||||
String message =
|
||||
"Application attempt " + appAttemptId
|
||||
+ " doesn't exist in ApplicationMasterService cache.";
|
||||
LOG.error(message);
|
||||
throw new ApplicationAttemptNotFoundException(message);
|
||||
}
|
||||
synchronized (lock) {
|
||||
AllocateResponse lastResponse = lock.getAllocateResponse();
|
||||
if (!hasApplicationMasterRegistered(appAttemptId)) {
|
||||
String message =
|
||||
"Application Master is not registered for known application: "
|
||||
+ applicationId
|
||||
+ ". Let AM resync.";
|
||||
"AM is not registered for known application attempt: " + appAttemptId
|
||||
+ " or RM had restarted after AM registered . AM should re-register.";
|
||||
LOG.info(message);
|
||||
RMAuditLogger.logFailure(
|
||||
this.rmContext.getRMApps().get(applicationId)
|
||||
.getUser(), AuditConstants.REGISTER_AM, "",
|
||||
"ApplicationMasterService", message,
|
||||
applicationId,
|
||||
appAttemptId);
|
||||
return resync;
|
||||
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
|
||||
.getUser(), AuditConstants.AM_ALLOCATE, "",
|
||||
"ApplicationMasterService", message, applicationId, appAttemptId);
|
||||
throw new ApplicationMasterNotRegisteredException(message);
|
||||
}
|
||||
|
||||
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
|
||||
/* old heartbeat */
|
||||
return lastResponse;
|
||||
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
|
||||
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
|
||||
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
|
||||
// Reboot is not useful since after AM reboots, it will send register
|
||||
// and
|
||||
// get an exception. Might as well throw an exception here.
|
||||
return resync;
|
||||
String message =
|
||||
"Invalid responseId in AllocateRequest from application attempt: "
|
||||
+ appAttemptId + ", expect responseId to be "
|
||||
+ (lastResponse.getResponseId() + 1);
|
||||
throw new InvalidApplicationMasterRequestException(message);
|
||||
}
|
||||
|
||||
//filter illegal progress values
|
||||
|
|
|
@ -50,6 +50,7 @@ public class RMAuditLogger {
|
|||
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
||||
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
||||
public static final String REGISTER_AM = "Register App Master";
|
||||
public static final String AM_ALLOCATE = "App Master Heartbeats";
|
||||
public static final String UNREGISTER_AM = "Unregister App Master";
|
||||
public static final String ALLOC_CONTAINER = "AM Allocated Container";
|
||||
public static final String RELEASE_CONTAINER = "AM Released Container";
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -44,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
|
@ -195,29 +196,33 @@ public class TestApplicationMasterLauncher {
|
|||
|
||||
// request for containers
|
||||
int request = 2;
|
||||
AllocateResponse ar =
|
||||
am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
AllocateResponse ar = null;
|
||||
try {
|
||||
ar = am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
|
||||
Assert.fail();
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
}
|
||||
|
||||
// kick the scheduler
|
||||
nm1.nodeHeartbeat(true);
|
||||
AllocateResponse amrs =
|
||||
am.allocate(new ArrayList<ResourceRequest>(),
|
||||
|
||||
AllocateResponse amrs = null;
|
||||
try {
|
||||
amrs = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
Assert.fail();
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
}
|
||||
|
||||
am.registerAppAttempt();
|
||||
thrown = false;
|
||||
try {
|
||||
am.registerAppAttempt(false);
|
||||
}
|
||||
catch (Exception e) {
|
||||
am.registerAppAttempt(false);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
Assert.assertEquals("Application Master is already registered : "
|
||||
+ attempt.getAppAttemptId().getApplicationId(),
|
||||
e.getMessage());
|
||||
thrown = true;
|
||||
}
|
||||
Assert.assertTrue(thrown);
|
||||
|
||||
// Simulate an AM that was disconnected and app attempt was removed
|
||||
// (responseMap does not contain attemptid)
|
||||
|
@ -226,9 +231,11 @@ public class TestApplicationMasterLauncher {
|
|||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
|
||||
AllocateResponse amrs2 =
|
||||
am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
|
||||
try {
|
||||
amrs = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.fail();
|
||||
} catch (ApplicationAttemptNotFoundException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
|
@ -62,7 +61,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -79,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
|
@ -297,10 +296,13 @@ public class TestRMRestart {
|
|||
// verify old AM is not accepted
|
||||
// change running AM to talk to new RM
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
AllocateResponse allocResponse = am1.allocate(
|
||||
new ArrayList<ResourceRequest>(),
|
||||
try {
|
||||
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
|
||||
Assert.fail();
|
||||
} catch (ApplicationAttemptNotFoundException e) {
|
||||
Assert.assertTrue(e instanceof ApplicationAttemptNotFoundException);
|
||||
}
|
||||
|
||||
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
||||
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||
|
@ -1758,8 +1760,7 @@ public class TestRMRestart {
|
|||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// recover app
|
||||
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||
|
||||
|
|
|
@ -20,13 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
|||
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
|
@ -35,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -107,7 +106,12 @@ public class TestAMRMRPCResponseId {
|
|||
|
||||
/** try sending old request again **/
|
||||
allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null);
|
||||
response = allocate(attempt.getAppAttemptId(), allocateRequest);
|
||||
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
|
||||
try {
|
||||
allocate(attempt.getAppAttemptId(), allocateRequest);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(e.getCause() instanceof InvalidApplicationMasterRequestException);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
|
|||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -206,6 +207,7 @@ public class TestAMRMTokens {
|
|||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testMasterKeyRollOver() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue