YARN-5088. Improve "yarn log" command-line to read the last K bytes for the log files. Contributed by Xuan Gong

This commit is contained in:
Xuan 2016-06-01 13:44:21 -07:00
parent 35356de1ba
commit 0bc05e40fa
9 changed files with 384 additions and 43 deletions

View File

@ -88,6 +88,7 @@ public class LogsCLI extends Configured implements Tool {
private static final String SHOW_META_INFO = "show_meta_info"; private static final String SHOW_META_INFO = "show_meta_info";
private static final String LIST_NODES_OPTION = "list_nodes"; private static final String LIST_NODES_OPTION = "list_nodes";
private static final String OUT_OPTION = "out"; private static final String OUT_OPTION = "out";
private static final String SIZE_OPTION = "size";
public static final String HELP_CMD = "help"; public static final String HELP_CMD = "help";
@Override @Override
@ -113,6 +114,7 @@ public class LogsCLI extends Configured implements Tool {
String[] logFiles = null; String[] logFiles = null;
List<String> amContainersList = new ArrayList<String>(); List<String> amContainersList = new ArrayList<String>();
String localDir = null; String localDir = null;
long bytes = Long.MAX_VALUE;
try { try {
CommandLine commandLine = parser.parse(opts, args, true); CommandLine commandLine = parser.parse(opts, args, true);
appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
@ -134,6 +136,9 @@ public class LogsCLI extends Configured implements Tool {
if (commandLine.hasOption(CONTAINER_LOG_FILES)) { if (commandLine.hasOption(CONTAINER_LOG_FILES)) {
logFiles = commandLine.getOptionValues(CONTAINER_LOG_FILES); logFiles = commandLine.getOptionValues(CONTAINER_LOG_FILES);
} }
if (commandLine.hasOption(SIZE_OPTION)) {
bytes = Long.parseLong(commandLine.getOptionValue(SIZE_OPTION));
}
} catch (ParseException e) { } catch (ParseException e) {
System.err.println("options parsing failed: " + e.getMessage()); System.err.println("options parsing failed: " + e.getMessage());
printHelpMessage(printOpts); printHelpMessage(printOpts);
@ -195,7 +200,7 @@ public class LogsCLI extends Configured implements Tool {
ContainerLogsRequest request = new ContainerLogsRequest(appId, ContainerLogsRequest request = new ContainerLogsRequest(appId,
isApplicationFinished(appState), appOwner, nodeAddress, null, isApplicationFinished(appState), appOwner, nodeAddress, null,
containerIdStr, localDir, logs); containerIdStr, localDir, logs, bytes);
if (showMetaInfo) { if (showMetaInfo) {
return showMetaInfo(request, logCliHelper); return showMetaInfo(request, logCliHelper);
@ -402,6 +407,7 @@ public class LogsCLI extends Configured implements Tool {
ClientResponse response = ClientResponse response =
webResource.path("ws").path("v1").path("node") webResource.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(logFile) .path("containerlogs").path(containerIdStr).path(logFile)
.queryParam("size", Long.toString(request.getBytes()))
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
out.println(response.getEntity(String.class)); out.println(response.getEntity(String.class));
out.println("End of LogType:" + logFile); out.println("End of LogType:" + logFile);
@ -442,7 +448,9 @@ public class LogsCLI extends Configured implements Tool {
newOptions); newOptions);
} }
private ContainerReport getContainerReport(String containerIdStr) @Private
@VisibleForTesting
public ContainerReport getContainerReport(String containerIdStr)
throws YarnException, IOException { throws YarnException, IOException {
YarnClient yarnClient = createYarnClient(); YarnClient yarnClient = createYarnClient();
try { try {
@ -636,12 +644,16 @@ public class LogsCLI extends Configured implements Tool {
opts.addOption(OUT_OPTION, true, "Local directory for storing individual " opts.addOption(OUT_OPTION, true, "Local directory for storing individual "
+ "container logs. The container logs will be stored based on the " + "container logs. The container logs will be stored based on the "
+ "node the container ran on."); + "node the container ran on.");
opts.addOption(SIZE_OPTION, true, "Prints the log file's first 'n' bytes "
+ "or the last 'n' bytes. Use negative values as bytes to read from "
+ "the end and positive values as bytes to read from the beginning.");
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner"); opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
opts.getOption(OUT_OPTION).setArgName("Local Directory"); opts.getOption(OUT_OPTION).setArgName("Local Directory");
opts.getOption(SIZE_OPTION).setArgName("size");
return opts; return opts;
} }
@ -656,6 +668,7 @@ public class LogsCLI extends Configured implements Tool {
printOpts.addOption(commandOpts.getOption(SHOW_META_INFO)); printOpts.addOption(commandOpts.getOption(SHOW_META_INFO));
printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION)); printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION));
printOpts.addOption(commandOpts.getOption(OUT_OPTION)); printOpts.addOption(commandOpts.getOption(OUT_OPTION));
printOpts.addOption(commandOpts.getOption(SIZE_OPTION));
return printOpts; return printOpts;
} }

View File

@ -203,6 +203,11 @@ public class TestLogsCLI {
pw.println(" for all the containers on the specific"); pw.println(" for all the containers on the specific");
pw.println(" NodeManager. Currently, this option can"); pw.println(" NodeManager. Currently, this option can");
pw.println(" only be used for finished applications."); pw.println(" only be used for finished applications.");
pw.println(" -size <size> Prints the log file's first 'n' bytes or");
pw.println(" the last 'n' bytes. Use negative values");
pw.println(" as bytes to read from the end and");
pw.println(" positive values as bytes to read from the");
pw.println(" beginning.");
pw.close(); pw.close();
String appReportStr = baos.toString("UTF-8"); String appReportStr = baos.toString("UTF-8");
Assert.assertEquals(appReportStr, sysOutStream.toString()); Assert.assertEquals(appReportStr, sysOutStream.toString());
@ -227,7 +232,7 @@ public class TestLogsCLI {
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
NodeId nodeId = NodeId.newInstance("localhost", 1234); final NodeId nodeId = NodeId.newInstance("localhost", 1234);
// create local logs // create local logs
String rootLogDir = "target/LocalLogs"; String rootLogDir = "target/LocalLogs";
@ -281,7 +286,16 @@ public class TestLogsCLI {
YarnClient mockYarnClient = YarnClient mockYarnClient =
createMockYarnClient( createMockYarnClient(
YarnApplicationState.FINISHED, ugi.getShortUserName()); YarnApplicationState.FINISHED, ugi.getShortUserName());
LogsCLI cli = new LogsCLIForTest(mockYarnClient); LogsCLI cli = new LogsCLIForTest(mockYarnClient) {
@Override
public ContainerReport getContainerReport(String containerIdStr)
throws YarnException, IOException {
ContainerReport mockReport = mock(ContainerReport.class);
doReturn(nodeId).when(mockReport).getAssignedNode();
doReturn("http://localhost:2345").when(mockReport).getNodeHttpAddress();
return mockReport;
}
};
cli.setConf(configuration); cli.setConf(configuration);
int exitCode = cli.run(new String[] { "-applicationId", appId.toString() }); int exitCode = cli.run(new String[] { "-applicationId", appId.toString() });
@ -307,6 +321,7 @@ public class TestLogsCLI {
"Hello container_0_0001_01_000003 in syslog!")); "Hello container_0_0001_01_000003 in syslog!"));
assertTrue(sysOutStream.toString().contains( assertTrue(sysOutStream.toString().contains(
"Hello container_0_0001_01_000003 in stdout!")); "Hello container_0_0001_01_000003 in stdout!"));
int fullSize = sysOutStream.toByteArray().length;
sysOutStream.reset(); sysOutStream.reset();
exitCode = cli.run(new String[] {"-applicationId", appId.toString(), exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
@ -329,6 +344,14 @@ public class TestLogsCLI {
"Can not find any log file matching the pattern: [123]")); "Can not find any log file matching the pattern: [123]"));
sysErrStream.reset(); sysErrStream.reset();
// specify the bytes which is larger than the actual file size,
// we would get the full logs
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
"-logFiles", ".*", "-size", "10000" });
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toByteArray().length == fullSize);
sysOutStream.reset();
// uploaded two logs for container1. The first log is empty. // uploaded two logs for container1. The first log is empty.
// The second one is not empty. // The second one is not empty.
// We can still successfully read logs for container1. // We can still successfully read logs for container1.
@ -345,6 +368,49 @@ public class TestLogsCLI {
+ " are not present in this log-file.")); + " are not present in this log-file."));
sysOutStream.reset(); sysOutStream.reset();
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId3.toString(), "-logFiles", "stdout" });
assertTrue(exitCode == 0);
int fullContextSize = sysOutStream.toByteArray().length;
String fullContext = sysOutStream.toString();
sysOutStream.reset();
String logMessage = "Hello container_0_0001_01_000003 in stdout!";
int fileContentSize = logMessage.getBytes().length;
int tailContentSize = "End of LogType:syslog\n\n".getBytes().length;
// specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from
// container log
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId3.toString(), "-logFiles", "stdout",
"-size", "5"});
assertTrue(exitCode == 0);
Assert.assertEquals(new String(logMessage.getBytes(), 0, 5),
new String(sysOutStream.toByteArray(),
(fullContextSize - fileContentSize - tailContentSize), 5));
sysOutStream.reset();
// specify a negative number, it would get the last n bytes from
// container log
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId3.toString(), "-logFiles", "stdout",
"-size", "-5"});
assertTrue(exitCode == 0);
Assert.assertEquals(new String(logMessage.getBytes(),
logMessage.getBytes().length - 5, 5),
new String(sysOutStream.toByteArray(),
(fullContextSize - fileContentSize - tailContentSize), 5));
sysOutStream.reset();
long negative = (fullContextSize + 1000) * (-1);
exitCode = cli.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId3.toString(), "-logFiles", "stdout",
"-size", Long.toString(negative)});
assertTrue(exitCode == 0);
Assert.assertEquals(fullContext, sysOutStream.toString());
sysOutStream.reset();
// Uploaded the empty log for container0. // Uploaded the empty log for container0.
// We should see the message showing the log for container0 // We should see the message showing the log for container0
// are not present. // are not present.

View File

@ -733,7 +733,7 @@ public class AggregatedLogFormat {
ps = new PrintStream(os); ps = new PrintStream(os);
while (true) { while (true) {
try { try {
readContainerLogs(valueStream, ps, logUploadedTime); readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE);
} catch (EOFException e) { } catch (EOFException e) {
// EndOfFile // EndOfFile
return; return;
@ -757,7 +757,8 @@ public class AggregatedLogFormat {
} }
private static void readContainerLogs(DataInputStream valueStream, private static void readContainerLogs(DataInputStream valueStream,
PrintStream out, long logUploadedTime) throws IOException { PrintStream out, long logUploadedTime, long bytes)
throws IOException {
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
String fileType = valueStream.readUTF(); String fileType = valueStream.readUTF();
@ -773,16 +774,35 @@ public class AggregatedLogFormat {
out.println(fileLengthStr); out.println(fileLengthStr);
out.println("Log Contents:"); out.println("Log Contents:");
long toSkip = 0;
long totalBytesToRead = fileLength;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
long skippedBytes = valueStream.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
}
long curRead = 0; long curRead = 0;
long pendingRead = fileLength - curRead; long pendingRead = totalBytesToRead - curRead;
int toRead = int toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead; pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead); int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) { while (len != -1 && curRead < totalBytesToRead) {
out.write(buf, 0, len); out.write(buf, 0, len);
curRead += len; curRead += len;
pendingRead = fileLength - curRead; pendingRead = totalBytesToRead - curRead;
toRead = toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead; pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); len = valueStream.read(buf, 0, toRead);
@ -803,7 +823,23 @@ public class AggregatedLogFormat {
public static void readAContainerLogsForALogType( public static void readAContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime) DataInputStream valueStream, PrintStream out, long logUploadedTime)
throws IOException { throws IOException {
readContainerLogs(valueStream, out, logUploadedTime); readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE);
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container for the specific bytes.
*
* @param valueStream
* @param out
* @param logUploadedTime
* @param bytes
* @throws IOException
*/
public static void readAContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime,
long bytes) throws IOException {
readContainerLogs(valueStream, out, logUploadedTime, bytes);
} }
/** /**
@ -832,6 +868,22 @@ public class AggregatedLogFormat {
public static int readContainerLogsForALogType( public static int readContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime, DataInputStream valueStream, PrintStream out, long logUploadedTime,
List<String> logType) throws IOException { List<String> logType) throws IOException {
return readContainerLogsForALogType(valueStream, out, logUploadedTime,
logType, Long.MAX_VALUE);
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* the specific types for a single container.
* @param valueStream
* @param out
* @param logUploadedTime
* @param logType
* @throws IOException
*/
public static int readContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime,
List<String> logType, long bytes) throws IOException {
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
String fileType = valueStream.readUTF(); String fileType = valueStream.readUTF();
@ -848,15 +900,34 @@ public class AggregatedLogFormat {
out.println(fileLengthStr); out.println(fileLengthStr);
out.println("Log Contents:"); out.println("Log Contents:");
long toSkip = 0;
long totalBytesToRead = fileLength;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
long skippedBytes = valueStream.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
}
long curRead = 0; long curRead = 0;
long pendingRead = fileLength - curRead; long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead); int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) { while (len != -1 && curRead < totalBytesToRead) {
out.write(buf, 0, len); out.write(buf, 0, len);
curRead += len; curRead += len;
pendingRead = fileLength - curRead; pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); len = valueStream.read(buf, 0, toRead);
} }

View File

@ -30,6 +30,7 @@ public class ContainerLogsRequest {
private boolean appFinished; private boolean appFinished;
private String outputLocalDir; private String outputLocalDir;
private List<String> logTypes; private List<String> logTypes;
private long bytes;
public ContainerLogsRequest() {} public ContainerLogsRequest() {}
@ -42,12 +43,13 @@ public class ContainerLogsRequest {
this.setContainerId(request.getContainerId()); this.setContainerId(request.getContainerId());
this.setOutputLocalDir(request.getOutputLocalDir()); this.setOutputLocalDir(request.getOutputLocalDir());
this.setLogTypes(request.getLogTypes()); this.setLogTypes(request.getLogTypes());
this.setBytes(request.getBytes());
} }
public ContainerLogsRequest(ApplicationId applicationId, public ContainerLogsRequest(ApplicationId applicationId,
boolean isAppFinished, String owner, boolean isAppFinished, String owner,
String address, String httpAddress, String container, String localDir, String address, String httpAddress, String container, String localDir,
List<String> logs) { List<String> logs, long bytes) {
this.setAppId(applicationId); this.setAppId(applicationId);
this.setAppFinished(isAppFinished); this.setAppFinished(isAppFinished);
this.setAppOwner(owner); this.setAppOwner(owner);
@ -56,6 +58,7 @@ public class ContainerLogsRequest {
this.setContainerId(container); this.setContainerId(container);
this.setOutputLocalDir(localDir); this.setOutputLocalDir(localDir);
this.setLogTypes(logs); this.setLogTypes(logs);
this.setBytes(bytes);
} }
public ApplicationId getAppId() { public ApplicationId getAppId() {
@ -121,4 +124,12 @@ public class ContainerLogsRequest {
public void setLogTypes(List<String> logTypes) { public void setLogTypes(List<String> logTypes) {
this.logTypes = logTypes; this.logTypes = logTypes;
} }
public long getBytes() {
return bytes;
}
public void setBytes(long bytes) {
this.bytes = bytes;
}
} }

View File

@ -65,6 +65,7 @@ public class LogCLIHelpers implements Configurable {
options.setAppOwner(jobOwner); options.setAppOwner(jobOwner);
List<String> logs = new ArrayList<String>(); List<String> logs = new ArrayList<String>();
options.setLogTypes(logs); options.setLogTypes(logs);
options.setBytes(Long.MAX_VALUE);
return dumpAContainersLogsForALogType(options, false); return dumpAContainersLogsForALogType(options, false);
} }
@ -160,12 +161,13 @@ public class LogCLIHelpers implements Configurable {
thisNodeFile.getPath()); thisNodeFile.getPath());
if (logType == null || logType.isEmpty()) { if (logType == null || logType.isEmpty()) {
if (dumpAContainerLogs(containerId, reader, out, if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime()) > -1) { thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
} else { } else {
if (dumpAContainerLogsForALogType(containerId, reader, out, if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType) > -1) { thisNodeFile.getModificationTime(), logType,
options.getBytes()) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
} }
@ -222,12 +224,13 @@ public class LogCLIHelpers implements Configurable {
out.println(StringUtils.repeat("=", containerId.length())); out.println(StringUtils.repeat("=", containerId.length()));
if (logType == null || logType.isEmpty()) { if (logType == null || logType.isEmpty()) {
if (dumpAContainerLogs(containerId, reader, out, if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime()) > -1) { thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
} else { } else {
if (dumpAContainerLogsForALogType(containerId, reader, out, if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType) > -1) { thisNodeFile.getModificationTime(), logType,
options.getBytes()) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
} }
@ -249,7 +252,7 @@ public class LogCLIHelpers implements Configurable {
@Private @Private
public int dumpAContainerLogs(String containerIdStr, public int dumpAContainerLogs(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out, AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime) throws IOException { long logUploadedTime, long bytes) throws IOException {
DataInputStream valueStream = getContainerLogsStream( DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader); containerIdStr, reader);
@ -261,7 +264,7 @@ public class LogCLIHelpers implements Configurable {
while (true) { while (true) {
try { try {
LogReader.readAContainerLogsForALogType(valueStream, out, LogReader.readAContainerLogsForALogType(valueStream, out,
logUploadedTime); logUploadedTime, bytes);
foundContainerLogs = true; foundContainerLogs = true;
} catch (EOFException eof) { } catch (EOFException eof) {
break; break;
@ -290,7 +293,8 @@ public class LogCLIHelpers implements Configurable {
@Private @Private
public int dumpAContainerLogsForALogType(String containerIdStr, public int dumpAContainerLogsForALogType(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out, AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, List<String> logType) throws IOException { long logUploadedTime, List<String> logType, long bytes)
throws IOException {
DataInputStream valueStream = getContainerLogsStream( DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader); containerIdStr, reader);
if (valueStream == null) { if (valueStream == null) {
@ -301,7 +305,7 @@ public class LogCLIHelpers implements Configurable {
while (true) { while (true) {
try { try {
int result = LogReader.readContainerLogsForALogType( int result = LogReader.readContainerLogsForALogType(
valueStream, out, logUploadedTime, logType); valueStream, out, logUploadedTime, logType, bytes);
if (result == 0) { if (result == 0) {
foundContainerLogs = true; foundContainerLogs = true;
} }
@ -361,12 +365,13 @@ public class LogCLIHelpers implements Configurable {
try { try {
if (logTypes == null || logTypes.isEmpty()) { if (logTypes == null || logTypes.isEmpty()) {
LogReader.readAContainerLogsForALogType(valueStream, out, LogReader.readAContainerLogsForALogType(valueStream, out,
thisNodeFile.getModificationTime()); thisNodeFile.getModificationTime(),
options.getBytes());
foundAnyLogs = true; foundAnyLogs = true;
} else { } else {
int result = LogReader.readContainerLogsForALogType( int result = LogReader.readContainerLogsForALogType(
valueStream, out, thisNodeFile.getModificationTime(), valueStream, out, thisNodeFile.getModificationTime(),
logTypes); logTypes, options.getBytes());
if (result == 0) { if (result == 0) {
foundAnyLogs = true; foundAnyLogs = true;
} }

View File

@ -213,7 +213,8 @@ public class AHSWebServices extends WebServices {
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr, @PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename, @PathParam("filename") String filename,
@QueryParam("download") String download) { @QueryParam("download") String download,
@QueryParam("size") String size) {
init(res); init(res);
ContainerId containerId; ContainerId containerId;
try { try {
@ -225,6 +226,9 @@ public class AHSWebServices extends WebServices {
boolean downloadFile = parseBooleanParam(download); boolean downloadFile = parseBooleanParam(download);
final long length = parseLongParam(size);
ApplicationId appId = containerId.getApplicationAttemptId() ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId(); .getApplicationId();
AppInfo appInfo; AppInfo appInfo;
@ -233,7 +237,7 @@ public class AHSWebServices extends WebServices {
} catch (Exception ex) { } catch (Exception ex) {
// directly find logs from HDFS. // directly find logs from HDFS.
return sendStreamOutputResponse(appId, null, null, containerIdStr, return sendStreamOutputResponse(appId, null, null, containerIdStr,
filename, downloadFile); filename, downloadFile, length);
} }
String appOwner = appInfo.getUser(); String appOwner = appInfo.getUser();
@ -247,7 +251,7 @@ public class AHSWebServices extends WebServices {
if (isFinishedState(appInfo.getAppState())) { if (isFinishedState(appInfo.getAppState())) {
// directly find logs from HDFS. // directly find logs from HDFS.
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, downloadFile); filename, downloadFile, length);
} }
return createBadResponse(Status.INTERNAL_SERVER_ERROR, return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get ContainerInfo for the container: " + containerId); "Can not get ContainerInfo for the container: " + containerId);
@ -267,7 +271,7 @@ public class AHSWebServices extends WebServices {
return response.build(); return response.build();
} else if (isFinishedState(appInfo.getAppState())) { } else if (isFinishedState(appInfo.getAppState())) {
return sendStreamOutputResponse(appId, appOwner, nodeId, return sendStreamOutputResponse(appId, appOwner, nodeId,
containerIdStr, filename, downloadFile); containerIdStr, filename, downloadFile, length);
} else { } else {
return createBadResponse(Status.NOT_FOUND, return createBadResponse(Status.NOT_FOUND,
"The application is not at Running or Finished State."); "The application is not at Running or Finished State.");
@ -296,11 +300,11 @@ public class AHSWebServices extends WebServices {
private Response sendStreamOutputResponse(ApplicationId appId, private Response sendStreamOutputResponse(ApplicationId appId,
String appOwner, String nodeId, String containerIdStr, String appOwner, String nodeId, String containerIdStr,
String fileName, boolean downloadFile) { String fileName, boolean downloadFile, long bytes) {
StreamingOutput stream = null; StreamingOutput stream = null;
try { try {
stream = getStreamingOutput(appId, appOwner, nodeId, stream = getStreamingOutput(appId, appOwner, nodeId,
containerIdStr, fileName); containerIdStr, fileName, bytes);
} catch (Exception ex) { } catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR, return createBadResponse(Status.INTERNAL_SERVER_ERROR,
ex.getMessage()); ex.getMessage());
@ -318,7 +322,7 @@ public class AHSWebServices extends WebServices {
private StreamingOutput getStreamingOutput(ApplicationId appId, private StreamingOutput getStreamingOutput(ApplicationId appId,
String appOwner, final String nodeId, final String containerIdStr, String appOwner, final String nodeId, final String containerIdStr,
final String logFile) throws IOException{ final String logFile, final long bytes) throws IOException{
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path( org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@ -391,16 +395,35 @@ public class AHSWebServices extends WebServices {
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length); os.write(b, 0, b.length);
long toSkip = 0;
long totalBytesToRead = fileLength;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
long skippedBytes = valueStream.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
}
long curRead = 0; long curRead = 0;
long pendingRead = fileLength - curRead; long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead; : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead); int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) { while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len); os.write(buf, 0, len);
curRead += len; curRead += len;
pendingRead = fileLength - curRead; pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead; : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); len = valueStream.read(buf, 0, toRead);
@ -433,4 +456,11 @@ public class AHSWebServices extends WebServices {
}; };
return stream; return stream;
} }
private long parseLongParam(String bytes) {
if (bytes == null || bytes.isEmpty()) {
return Long.MAX_VALUE;
}
return Long.parseLong(bytes);
}
} }

View File

@ -601,6 +601,72 @@ public class TestAHSWebServices extends JerseyTestBase {
.get(ClientResponse.class); .get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1ForApp100)); assertTrue(responseText.contains("Hello." + containerId1ForApp100));
int fullTextSize = responseText.getBytes().length;
int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length;
String logMessage = "Hello." + containerId1ForApp100;
int fileContentSize = logMessage.getBytes().length;
// specify how many bytes we should get from logs
// if we specify a position number, it would get the first n bytes from
// container log
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "5")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(responseText.getBytes().length,
(fullTextSize - fileContentSize) + 5);
assertTrue(fullTextSize >= responseText.getBytes().length);
assertEquals(new String(responseText.getBytes(),
(fullTextSize - fileContentSize - tailTextSize), 5),
new String(logMessage.getBytes(), 0, 5));
// specify how many bytes we should get from logs
// if we specify a negative number, it would get the last n bytes from
// container log
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "-5")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(responseText.getBytes().length,
(fullTextSize - fileContentSize) + 5);
assertTrue(fullTextSize >= responseText.getBytes().length);
assertEquals(new String(responseText.getBytes(),
(fullTextSize - fileContentSize - tailTextSize), 5),
new String(logMessage.getBytes(), fileContentSize - 5, 5));
// specify the bytes which is larger than the actual file size,
// we would get the full logs
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "10000")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(responseText.getBytes().length, fullTextSize);
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam("size", "-10000")
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(responseText.getBytes().length, fullTextSize);
} }
private static void createContainerLogInLocalDir(Path appLogsDir, private static void createContainerLogInLocalDir(Path appLogsDir,

View File

@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -217,7 +216,8 @@ public class NMWebServices {
@Unstable @Unstable
public Response getLogs(@PathParam("containerid") String containerIdStr, public Response getLogs(@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename, @PathParam("filename") String filename,
@QueryParam("download") String download) { @QueryParam("download") String download,
@QueryParam("size") String size) {
ContainerId containerId; ContainerId containerId;
try { try {
containerId = ConverterUtils.toContainerId(containerIdStr); containerId = ConverterUtils.toContainerId(containerIdStr);
@ -235,9 +235,12 @@ public class NMWebServices {
return Response.serverError().entity(ex.getMessage()).build(); return Response.serverError().entity(ex.getMessage()).build();
} }
boolean downloadFile = parseBooleanParam(download); boolean downloadFile = parseBooleanParam(download);
final long bytes = parseLongParam(size);
try { try {
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead( final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
containerIdStr, logFile, nmContext); containerIdStr, logFile, nmContext);
final long fileLength = logFile.length();
StreamingOutput stream = new StreamingOutput() { StreamingOutput stream = new StreamingOutput() {
@Override @Override
@ -245,9 +248,38 @@ public class NMWebServices {
WebApplicationException { WebApplicationException {
int bufferSize = 65536; int bufferSize = 65536;
byte[] buf = new byte[bufferSize]; byte[] buf = new byte[bufferSize];
int len; long toSkip = 0;
while ((len = fis.read(buf, 0, bufferSize)) > 0) { long totalBytesToRead = fileLength;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
long skippedBytes = fis.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are different "
+ "from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len); os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = fis.read(buf, 0, toRead);
} }
os.flush(); os.flush();
} }
@ -268,4 +300,11 @@ public class NMWebServices {
} }
return false; return false;
} }
private long parseLongParam(String bytes) {
if (bytes == null || bytes.isEmpty()) {
return Long.MAX_VALUE;
}
return Long.parseLong(bytes);
}
} }

View File

@ -26,17 +26,14 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringReader; import java.io.StringReader;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.NodeHealthScriptRunner;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -351,6 +348,49 @@ public class TestNMWebServices extends JerseyTestBase {
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class); String responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText); assertEquals(logMessage, responseText);
int fullTextSize = responseText.getBytes().length;
// specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from
// container log
response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(filename)
.queryParam("size", "5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(5, responseText.getBytes().length);
assertEquals(new String(logMessage.getBytes(), 0, 5), responseText);
assertTrue(fullTextSize >= responseText.getBytes().length);
// specify the bytes which is larger than the actual file size,
// we would get the full logs
response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(filename)
.queryParam("size", "10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(fullTextSize, responseText.getBytes().length);
assertEquals(logMessage, responseText);
// specify a negative number, it would get the last n bytes from
// container log
response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(filename)
.queryParam("size", "-5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(5, responseText.getBytes().length);
assertEquals(new String(logMessage.getBytes(),
logMessage.getBytes().length - 5, 5), responseText);
assertTrue(fullTextSize >= responseText.getBytes().length);
response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(filename)
.queryParam("size", "-10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(fullTextSize, responseText.getBytes().length);
assertEquals(logMessage, responseText);
// ask and download it // ask and download it
response = r.path("ws").path("v1").path("node").path("containerlogs") response = r.path("ws").path("v1").path("node").path("containerlogs")