YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.
(cherry picked from commit dfe60392c9
)
This commit is contained in:
parent
0aa2d7d506
commit
7b175739a9
|
@ -737,7 +737,7 @@ public class ServiceScheduler extends CompositeService {
|
|||
LOG.warn(
|
||||
"Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
|
||||
containerId, status.getExitStatus(), status.getDiagnostics());
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
ComponentEvent event =
|
||||
new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED)
|
||||
|
|
|
@ -22,22 +22,29 @@ import com.google.common.collect.ImmutableMap;
|
|||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
|
||||
|
@ -47,7 +54,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -63,6 +72,8 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TestServiceAM extends ServiceTestUtils{
|
||||
|
||||
|
@ -72,6 +83,9 @@ public class TestServiceAM extends ServiceTestUtils{
|
|||
private File basedir;
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
TestingCluster zkCluster;
|
||||
@Rule
|
||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
||||
new ServiceTestUtils.ServiceFSWatcher();
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
@ -311,6 +325,80 @@ public class TestServiceAM extends ServiceTestUtils{
|
|||
am.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerCompletedEventProcessed() throws Exception {
|
||||
ServiceContext context = createServiceContext("abc");
|
||||
MockServiceScheduler scheduler = new MockServiceScheduler(context);
|
||||
scheduler.init(conf);
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||
1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 0);
|
||||
ContainerStatus containerStatus1 = ContainerStatus.newInstance(containerId1,
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||
"successful", 0);
|
||||
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
ContainerStatus containerStatus2 = ContainerStatus.newInstance(containerId2,
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||
"successful", 0);
|
||||
ComponentInstance instance = Mockito.mock(ComponentInstance.class);
|
||||
Mockito.doReturn("componentInstance").when(instance).getCompName();
|
||||
scheduler.addLiveCompInstance(containerId2, instance);
|
||||
List<ContainerStatus> statuses = new ArrayList<>();
|
||||
// First container instance will be null
|
||||
statuses.add(containerStatus1);
|
||||
// Second container instance is added
|
||||
scheduler.addLiveCompInstance(containerId2, instance);
|
||||
statuses.add(containerStatus2);
|
||||
scheduler.callbackHandler.onContainersCompleted(statuses);
|
||||
// For second container event should be dispatched.
|
||||
verify(scheduler.dispatcher, times(1)).getEventHandler();
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
private ServiceContext createServiceContext(String name)
|
||||
throws Exception {
|
||||
Artifact artifact = new Artifact();
|
||||
artifact.setId("1");
|
||||
artifact.setType(Artifact.TypeEnum.TARBALL);
|
||||
Service serviceDef = ServiceTestUtils.createExampleApplication();
|
||||
ApplicationId applicationId = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
serviceDef.setId(applicationId.toString());
|
||||
serviceDef.setName(name);
|
||||
serviceDef.setState(ServiceState.STARTED);
|
||||
serviceDef.getComponents().forEach(component ->
|
||||
component.setArtifact(artifact));
|
||||
ServiceContext context = new MockRunningServiceContext(rule,
|
||||
serviceDef);
|
||||
context.scheduler.getDispatcher().setDrainEventsOnStop();
|
||||
context.scheduler.getDispatcher().start();
|
||||
return context;
|
||||
}
|
||||
|
||||
class MockServiceScheduler extends ServiceScheduler {
|
||||
private AsyncDispatcher dispatcher;
|
||||
private AMRMClientCallback callbackHandler = new AMRMClientCallback();
|
||||
|
||||
MockServiceScheduler(ServiceContext context) {
|
||||
super(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsyncDispatcher createAsyncDispatcher() {
|
||||
dispatcher = Mockito.mock(AsyncDispatcher.class);
|
||||
EventHandler<Event> handler = Mockito.mock(EventHandler.class);
|
||||
Mockito.doReturn(handler).when(dispatcher).getEventHandler();
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
|
||||
return AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecordTokensForContainers() throws Exception {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
|
||||
|
|
Loading…
Reference in New Issue