YARN-4965. Distributed shell AM failed due to ClientHandlerException thrown by jersey. Contributed by Junping Du
(cherry picked from commit e6c0742012
)
This commit is contained in:
parent
edb096eab7
commit
94e4d349b0
|
@ -77,6 +77,8 @@ import org.codehaus.jackson.node.JsonNodeFactory;
|
|||
import org.codehaus.jackson.node.ObjectNode;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
|
||||
/**
|
||||
* The job history events get routed to this class. This class writes the Job
|
||||
* history events to the DFS directly into a staging dir and then moved to a
|
||||
|
@ -1033,12 +1035,9 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
+ error.getErrorCode());
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
} catch (YarnException | IOException | ClientHandlerException ex) {
|
||||
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
|
||||
+ "Server", ex);
|
||||
} catch (YarnException ex) {
|
||||
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
|
||||
+ "Server", ex);
|
||||
+ "Server", ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -131,6 +131,12 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-common</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
|
|
|
@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|||
import org.apache.log4j.LogManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
|
||||
/**
|
||||
* An ApplicationMaster for executing shell commands on a set of launched
|
||||
|
@ -1149,13 +1150,14 @@ public class ApplicationMaster {
|
|||
putContainerEntity(timelineClient,
|
||||
container.getId().getApplicationAttemptId(),
|
||||
entity));
|
||||
} catch (YarnException | IOException e) {
|
||||
} catch (YarnException | IOException | ClientHandlerException e) {
|
||||
LOG.error("Container start event could not be published for "
|
||||
+ container.getId().toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void publishContainerEndEvent(
|
||||
@VisibleForTesting
|
||||
void publishContainerEndEvent(
|
||||
final TimelineClient timelineClient, ContainerStatus container,
|
||||
String domainId, UserGroupInformation ugi) {
|
||||
final TimelineEntity entity = new TimelineEntity();
|
||||
|
@ -1177,7 +1179,7 @@ public class ApplicationMaster {
|
|||
putContainerEntity(timelineClient,
|
||||
container.getContainerId().getApplicationAttemptId(),
|
||||
entity));
|
||||
} catch (YarnException | IOException e) {
|
||||
} catch (YarnException | IOException | ClientHandlerException e) {
|
||||
LOG.error("Container end event could not be published for "
|
||||
+ container.getContainerId().toString(), e);
|
||||
}
|
||||
|
@ -1212,7 +1214,7 @@ public class ApplicationMaster {
|
|||
try {
|
||||
TimelinePutResponse response = timelineClient.putEntities(entity);
|
||||
processTimelineResponseErrors(response);
|
||||
} catch (YarnException | IOException e) {
|
||||
} catch (YarnException | IOException | ClientHandlerException e) {
|
||||
LOG.error("App Attempt "
|
||||
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
|
||||
+ " event could not be published for "
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
|
@ -27,6 +31,7 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -46,14 +51,24 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.ServerSocketUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.JarFinder;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
|
||||
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
|
@ -61,6 +76,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
|
|||
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -69,6 +85,8 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
|
||||
public class TestDistributedShell {
|
||||
|
||||
private static final Log LOG =
|
||||
|
@ -77,6 +95,7 @@ public class TestDistributedShell {
|
|||
protected MiniYARNCluster yarnCluster = null;
|
||||
protected MiniDFSCluster hdfsCluster = null;
|
||||
private FileSystem fs = null;
|
||||
private TimelineWriter spyTimelineWriter;
|
||||
protected YarnConfiguration conf = null;
|
||||
private static final int NUM_NMS = 1;
|
||||
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
|
||||
|
@ -865,6 +884,37 @@ public class TestDistributedShell {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDSTimelineClientWithConnectionRefuse() throws Exception {
|
||||
ApplicationMaster am = new ApplicationMaster();
|
||||
|
||||
TimelineClientImpl client = new TimelineClientImpl() {
|
||||
@Override
|
||||
protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
|
||||
URI resURI) throws IOException {
|
||||
TimelineWriter timelineWriter =
|
||||
new DirectTimelineWriter(authUgi, client, resURI);
|
||||
spyTimelineWriter = spy(timelineWriter);
|
||||
return spyTimelineWriter;
|
||||
}
|
||||
};
|
||||
client.init(conf);
|
||||
client.start();
|
||||
TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null,
|
||||
false, true);
|
||||
try {
|
||||
UserGroupInformation ugi = mock(UserGroupInformation.class);
|
||||
when(ugi.getShortUserName()).thenReturn("user1");
|
||||
// verify no ClientHandlerException get thrown out.
|
||||
am.publishContainerEndEvent(client, ContainerStatus.newInstance(
|
||||
BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
|
||||
1), "domainId", ugi);
|
||||
} finally {
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected void waitForNMsToRegister() throws Exception {
|
||||
int sec = 60;
|
||||
while (sec >= 0) {
|
||||
|
|
|
@ -298,7 +298,7 @@ public class TestTimelineClient {
|
|||
client.connectionRetry.getRetired());
|
||||
}
|
||||
|
||||
private static ClientResponse mockEntityClientResponse(
|
||||
public static ClientResponse mockEntityClientResponse(
|
||||
TimelineWriter spyTimelineWriter, ClientResponse.Status status,
|
||||
boolean hasError, boolean hasRuntimeError) {
|
||||
ClientResponse response = mock(ClientResponse.class);
|
||||
|
|
Loading…
Reference in New Issue