YARN-5523. Yarn running container log fetching causes OutOfMemoryError (Xuan Gong via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-08-18 01:45:33 +05:30
parent f80a729832
commit e3037c5641
2 changed files with 150 additions and 19 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.client.cli;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -41,6 +42,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -462,15 +464,7 @@ public class LogsCLI extends Configured implements Tool {
PrintStream out = logCliHelper.createPrintStream(localDir, nodeId, PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
containerIdStr); containerIdStr);
try { try {
// fetch all the log files for the container Set<String> matchedFiles = getMatchedContainerLogFiles(request,
// filter the log files based on the given -log_files pattern
List<PerLogFileInfo> allLogFileInfos=
getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress);
List<String> fileNames = new ArrayList<String>();
for (PerLogFileInfo fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getFileName());
}
Set<String> matchedFiles = getMatchedLogFiles(request, fileNames,
useRegex); useRegex);
if (matchedFiles.isEmpty()) { if (matchedFiles.isEmpty()) {
System.err.println("Can not find any log file matching the pattern: " System.err.println("Can not find any log file matching the pattern: "
@ -487,22 +481,33 @@ public class LogsCLI extends Configured implements Tool {
out.println(containerString); out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length()));
boolean foundAnyLogs = false; boolean foundAnyLogs = false;
byte[] buffer = new byte[65536];
for (String logFile : newOptions.getLogTypes()) { for (String logFile : newOptions.getLogTypes()) {
out.println("LogType:" + logFile); out.println("LogType:" + logFile);
out.println("Log Upload Time:" out.println("Log Upload Time:"
+ Times.format(System.currentTimeMillis())); + Times.format(System.currentTimeMillis()));
out.println("Log Contents:"); out.println("Log Contents:");
InputStream is = null;
try { try {
WebResource webResource = ClientResponse response = getResponeFromNMWebService(conf,
webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) webServiceClient, request, logFile);
+ nodeHttpAddress); if (response != null && response.getStatusInfo().getStatusCode() ==
ClientResponse response = ClientResponse.Status.OK.getStatusCode()) {
webResource.path("ws").path("v1").path("node") is = response.getEntityInputStream();
.path("containers").path(containerIdStr).path("logs") int len = 0;
.path(logFile) while((len = is.read(buffer)) != -1) {
.queryParam("size", Long.toString(request.getBytes())) out.write(buffer, 0, len);
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); }
out.println(response.getEntity(String.class)); out.println();
} else {
out.println("Can not get any logs for the log file: " + logFile);
String msg = "Response from the NodeManager:" + nodeId +
" WebService is " + ((response == null) ? "null":
"not successful," + " HTTP error code: " +
response.getStatus() + ", Server response:\n" +
response.getEntity(String.class));
out.println(msg);
}
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("End of LogType:" + logFile + "."); sb.append("End of LogType:" + logFile + ".");
if (request.getContainerState() == ContainerState.RUNNING) { if (request.getContainerState() == ContainerState.RUNNING) {
@ -517,6 +522,8 @@ public class LogsCLI extends Configured implements Tool {
System.err.println("Can not find the log file:" + logFile System.err.println("Can not find the log file:" + logFile
+ " for the container:" + containerIdStr + " in NodeManager:" + " for the container:" + containerIdStr + " in NodeManager:"
+ nodeId); + nodeId);
} finally {
IOUtils.closeQuietly(is);
} }
} }
// for the case, we have already uploaded partial logs in HDFS // for the case, we have already uploaded partial logs in HDFS
@ -1189,4 +1196,33 @@ public class LogsCLI extends Configured implements Tool {
this.fileLength = fileLength; this.fileLength = fileLength;
} }
} }
@VisibleForTesting
public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request,
boolean useRegex) throws IOException {
// fetch all the log files for the container
// filter the log files based on the given -log_files pattern
List<PerLogFileInfo> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>();
for (PerLogFileInfo fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getFileName());
}
return getMatchedLogFiles(request, fileNames,
useRegex);
}
@VisibleForTesting
public ClientResponse getResponeFromNMWebService(Configuration conf,
Client webServiceClient, ContainerLogsRequest request, String logFile) {
WebResource webResource =
webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf)
+ request.getNodeHttpAddress());
return webResource.path("ws").path("v1").path("node")
.path("containers").path(request.getContainerId()).path("logs")
.path(logFile)
.queryParam("size", Long.toString(request.getBytes()))
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
}
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -30,10 +31,14 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -63,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
@ -581,6 +587,95 @@ public class TestLogsCLI {
fs.delete(new Path(rootLogDir), true); fs.delete(new Path(rootLogDir), true);
} }
@Test (timeout = 5000)
public void testGetRunningContainerLogs() throws Exception {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
NodeId nodeId = NodeId.newInstance("localhost", 1234);
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId
.newInstance(appId, 1);
// Create a mock ApplicationAttempt Report
ApplicationAttemptReport mockAttemptReport = mock(
ApplicationAttemptReport.class);
doReturn(appAttemptId).when(mockAttemptReport).getApplicationAttemptId();
List<ApplicationAttemptReport> attemptReports = Arrays.asList(
mockAttemptReport);
// Create one mock containerReport
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerReport mockContainerReport1 = mock(ContainerReport.class);
doReturn(containerId1).when(mockContainerReport1).getContainerId();
doReturn(nodeId).when(mockContainerReport1).getAssignedNode();
doReturn("http://localhost:2345").when(mockContainerReport1)
.getNodeHttpAddress();
doReturn(ContainerState.RUNNING).when(mockContainerReport1)
.getContainerState();
List<ContainerReport> containerReports = Arrays.asList(
mockContainerReport1);
// Mock the YarnClient, and it would report the previous created
// mockAttemptReport and previous two created mockContainerReports
YarnClient mockYarnClient = createMockYarnClient(
YarnApplicationState.RUNNING, ugi.getShortUserName(), true,
attemptReports, containerReports);
doReturn(mockContainerReport1).when(mockYarnClient).getContainerReport(
any(ContainerId.class));
// create local logs
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(configuration);
String rootLogDir = "target/LocalLogs";
Path rootLogDirPath = new Path(rootLogDir);
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
String fileName = "syslog";
List<String> logTypes = new ArrayList<String>();
logTypes.add(fileName);
// create container logs in localLogDir
createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes);
Path containerDirPath = new Path(appLogsDir, containerId1.toString());
Path logPath = new Path(containerDirPath, fileName);
File logFile = new File(logPath.toString());
final FileInputStream fis = new FileInputStream(logFile);
try {
LogsCLI cli = spy(new LogsCLIForTest(mockYarnClient));
Set<String> logsSet = new HashSet<String>();
logsSet.add(fileName);
doReturn(logsSet).when(cli).getMatchedContainerLogFiles(
any(ContainerLogsRequest.class), anyBoolean());
ClientResponse mockReponse = mock(ClientResponse.class);
doReturn(Status.OK).when(mockReponse).getStatusInfo();
doReturn(fis).when(mockReponse).getEntityInputStream();
doReturn(mockReponse).when(cli).getResponeFromNMWebService(
any(Configuration.class),
any(Client.class),
any(ContainerLogsRequest.class), anyString());
cli.setConf(new YarnConfiguration());
int exitCode = cli.run(new String[] {"-containerId",
containerId1.toString()});
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
logMessage(containerId1, "syslog")));
sysOutStream.reset();
} finally {
IOUtils.closeQuietly(fis);
fs.delete(new Path(rootLogDir), true);
}
}
@Test (timeout = 5000) @Test (timeout = 5000)
public void testFetchRunningApplicationLogs() throws Exception { public void testFetchRunningApplicationLogs() throws Exception {