YARN-5199. Close LogReader in in AHSWebServices#getStreamingOutput and

FileInputStream in NMWebServices#getLogs. Contributed by Xuan Gong

(cherry picked from commit 58be55b6e0)
This commit is contained in:
Xuan 2016-06-07 16:07:02 -07:00
parent 6a9f38ebaf
commit 10f0c0475e
2 changed files with 118 additions and 108 deletions

View File

@ -40,7 +40,6 @@
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -363,86 +362,94 @@ public void write(OutputStream os) throws IOException,
if ((nodeId == null || nodeName.contains(LogAggregationUtils if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith( .getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) { LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = AggregatedLogFormat.LogReader reader = null;
new AggregatedLogFormat.LogReader(conf, try {
thisNodeFile.getPath()); reader = new AggregatedLogFormat.LogReader(conf,
DataInputStream valueStream; thisNodeFile.getPath());
LogKey key = new LogKey(); DataInputStream valueStream;
valueStream = reader.next(key); LogKey key = new LogKey();
while (valueStream != null && !key.toString()
.equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key); valueStream = reader.next(key);
} while (valueStream != null && !key.toString()
if (valueStream == null) { .equals(containerIdStr)) {
continue; // Next container
} key = new LogKey();
while (true) { valueStream = reader.next(key);
try { }
String fileType = valueStream.readUTF(); if (valueStream == null) {
String fileLengthStr = valueStream.readUTF(); continue;
long fileLength = Long.parseLong(fileLengthStr); }
if (fileType.equalsIgnoreCase(logFile)) { while (true) {
StringBuilder sb = new StringBuilder(); try {
sb.append("LogType:"); String fileType = valueStream.readUTF();
sb.append(fileType + "\n"); String fileLengthStr = valueStream.readUTF();
sb.append("Log Upload Time:"); long fileLength = Long.parseLong(fileLengthStr);
sb.append(Times.format(System.currentTimeMillis()) + "\n"); if (fileType.equalsIgnoreCase(logFile)) {
sb.append("LogLength:"); StringBuilder sb = new StringBuilder();
sb.append(fileLengthStr + "\n"); sb.append("LogType:");
sb.append("Log Contents:\n"); sb.append(fileType + "\n");
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); sb.append("Log Upload Time:");
os.write(b, 0, b.length); sb.append(Times.format(System.currentTimeMillis()) + "\n");
sb.append("LogLength:");
sb.append(fileLengthStr + "\n");
sb.append("Log Contents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
long toSkip = 0; long toSkip = 0;
long totalBytesToRead = fileLength; long totalBytesToRead = fileLength;
if (bytes < 0) { if (bytes < 0) {
long absBytes = Math.abs(bytes); long absBytes = Math.abs(bytes);
if (absBytes < fileLength) { if (absBytes < fileLength) {
toSkip = fileLength - absBytes; toSkip = fileLength - absBytes;
totalBytesToRead = absBytes; totalBytesToRead = absBytes;
}
long skippedBytes = valueStream.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
} }
long skippedBytes = valueStream.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are "
+ "different from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
}
long curRead = 0; long curRead = 0;
long pendingRead = totalBytesToRead - curRead; long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead; : (int) pendingRead;
len = valueStream.read(buf, 0, toRead); int len = valueStream.read(buf, 0, toRead);
} while (len != -1 && curRead < totalBytesToRead) {
sb = new StringBuilder(); os.write(buf, 0, len);
sb.append("\nEnd of LogType:" + fileType + "\n"); curRead += len;
b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length); pendingRead = totalBytesToRead - curRead;
findLogs = true; toRead = pendingRead > buf.length ? buf.length
} else { : (int) pendingRead;
long totalSkipped = 0; len = valueStream.read(buf, 0, toRead);
long currSkipped = 0; }
while (currSkipped != -1 && totalSkipped < fileLength) { sb = new StringBuilder();
currSkipped = valueStream.skip(fileLength - totalSkipped); sb.append("\nEnd of LogType:" + fileType + "\n");
totalSkipped += currSkipped; b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
} }
} catch (EOFException eof) {
break;
} }
} catch (EOFException eof) { }
break; } finally {
if (reader != null) {
reader.close();
} }
} }
} }

View File

@ -37,7 +37,7 @@
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -236,7 +236,6 @@ public Response getLogs(@PathParam("containerid") String containerIdStr,
} }
boolean downloadFile = parseBooleanParam(download); boolean downloadFile = parseBooleanParam(download);
final long bytes = parseLongParam(size); final long bytes = parseLongParam(size);
try { try {
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead( final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
containerIdStr, logFile, nmContext); containerIdStr, logFile, nmContext);
@ -246,42 +245,46 @@ public Response getLogs(@PathParam("containerid") String containerIdStr,
@Override @Override
public void write(OutputStream os) throws IOException, public void write(OutputStream os) throws IOException,
WebApplicationException { WebApplicationException {
int bufferSize = 65536; try {
byte[] buf = new byte[bufferSize]; int bufferSize = 65536;
long toSkip = 0; byte[] buf = new byte[bufferSize];
long totalBytesToRead = fileLength; long toSkip = 0;
if (bytes < 0) { long totalBytesToRead = fileLength;
long absBytes = Math.abs(bytes); if (bytes < 0) {
if (absBytes < fileLength) { long absBytes = Math.abs(bytes);
toSkip = fileLength - absBytes; if (absBytes < fileLength) {
totalBytesToRead = absBytes; toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
long skippedBytes = fis.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are different "
+ "from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
} }
long skippedBytes = fis.skip(toSkip);
if (skippedBytes != toSkip) {
throw new IOException("The bytes were skipped are different "
+ "from the caller requested");
}
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
}
}
long curRead = 0; long curRead = 0;
long pendingRead = totalBytesToRead - curRead; long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead; : (int) pendingRead;
len = fis.read(buf, 0, toRead); int len = fis.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = fis.read(buf, 0, toRead);
}
os.flush();
} finally {
IOUtils.closeQuietly(fis);
} }
os.flush();
} }
}; };
ResponseBuilder resp = Response.ok(stream); ResponseBuilder resp = Response.ok(stream);