YARN-4965. Distributed shell AM failed due to ClientHandlerException thrown by jersey. Contributed by Junping Du

(cherry picked from commit e6c0742012)
(cherry picked from commit 94e4d349b0)
This commit is contained in:
Xuan 2016-04-16 19:39:18 -07:00
parent 1e33c05a7e
commit 84ab382939
5 changed files with 67 additions and 10 deletions

View File

@ -77,6 +77,8 @@
import org.codehaus.jackson.node.ObjectNode; import org.codehaus.jackson.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/** /**
* The job history events get routed to this class. This class writes the Job * The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a * history events to the DFS directly into a staging dir and then moved to a
@ -1033,10 +1035,7 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
+ error.getErrorCode()); + error.getErrorCode());
} }
} }
} catch (IOException ex) { } catch (YarnException | IOException | ClientHandlerException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
} catch (YarnException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline" LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex); + "Server", ex);
} }

View File

@ -131,6 +131,12 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId> <artifactId>hadoop-hdfs</artifactId>

View File

@ -104,6 +104,7 @@
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/** /**
* An ApplicationMaster for executing shell commands on a set of launched * An ApplicationMaster for executing shell commands on a set of launched
@ -1148,13 +1149,14 @@ private void publishContainerStartEvent(
putContainerEntity(timelineClient, putContainerEntity(timelineClient,
container.getId().getApplicationAttemptId(), container.getId().getApplicationAttemptId(),
entity)); entity));
} catch (YarnException | IOException e) { } catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container start event could not be published for " LOG.error("Container start event could not be published for "
+ container.getId().toString(), e); + container.getId().toString(), e);
} }
} }
private void publishContainerEndEvent( @VisibleForTesting
void publishContainerEndEvent(
final TimelineClient timelineClient, ContainerStatus container, final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) { String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity(); final TimelineEntity entity = new TimelineEntity();
@ -1176,7 +1178,7 @@ private void publishContainerEndEvent(
putContainerEntity(timelineClient, putContainerEntity(timelineClient,
container.getContainerId().getApplicationAttemptId(), container.getContainerId().getApplicationAttemptId(),
entity)); entity));
} catch (YarnException | IOException e) { } catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container end event could not be published for " LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(), e); + container.getContainerId().toString(), e);
} }
@ -1211,7 +1213,7 @@ private void publishApplicationAttemptEvent(
try { try {
TimelinePutResponse response = timelineClient.putEntities(entity); TimelinePutResponse response = timelineClient.putEntities(entity);
processTimelineResponseErrors(response); processTimelineResponseErrors(response);
} catch (YarnException | IOException e) { } catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("App Attempt " LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for " + " event could not be published for "

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.applications.distributedshell; package org.apache.hadoop.yarn.applications.distributedshell;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
@ -27,6 +31,7 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -46,14 +51,24 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -61,6 +76,7 @@
import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion; import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -69,6 +85,8 @@
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import com.sun.jersey.api.client.ClientHandlerException;
public class TestDistributedShell { public class TestDistributedShell {
private static final Log LOG = private static final Log LOG =
@ -77,6 +95,7 @@ public class TestDistributedShell {
protected MiniYARNCluster yarnCluster = null; protected MiniYARNCluster yarnCluster = null;
protected MiniDFSCluster hdfsCluster = null; protected MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null; private FileSystem fs = null;
private TimelineWriter spyTimelineWriter;
protected YarnConfiguration conf = null; protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1; private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f; private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
@ -865,6 +884,37 @@ public void testDSShellWithInvalidArgs() throws Exception {
} }
} }
@Test
public void testDSTimelineClientWithConnectionRefuse() throws Exception {
ApplicationMaster am = new ApplicationMaster();
TimelineClientImpl client = new TimelineClientImpl() {
@Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
URI resURI) throws IOException {
TimelineWriter timelineWriter =
new DirectTimelineWriter(authUgi, client, resURI);
spyTimelineWriter = spy(timelineWriter);
return spyTimelineWriter;
}
};
client.init(conf);
client.start();
TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null,
false, true);
try {
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getShortUserName()).thenReturn("user1");
// verify no ClientHandlerException get thrown out.
am.publishContainerEndEvent(client, ContainerStatus.newInstance(
BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
1), "domainId", ugi);
} finally {
client.stop();
}
}
protected void waitForNMsToRegister() throws Exception { protected void waitForNMsToRegister() throws Exception {
int sec = 60; int sec = 60;
while (sec >= 0) { while (sec >= 0) {

View File

@ -298,7 +298,7 @@ private void assertException(TimelineClientImpl client, RuntimeException ce) {
client.connectionRetry.getRetired()); client.connectionRetry.getRetired());
} }
private static ClientResponse mockEntityClientResponse( public static ClientResponse mockEntityClientResponse(
TimelineWriter spyTimelineWriter, ClientResponse.Status status, TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) { boolean hasError, boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);