YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature that were caused when adding log-upload-time via YARN-2703. Contributed by Xuan Gong.
(cherry picked from commit 58e9f24e0f
)
This commit is contained in:
parent
dd82a1eb75
commit
c205841d49
|
@ -770,6 +770,10 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2798. Fixed YarnClient to populate the renewer correctly for Timeline
|
YARN-2798. Fixed YarnClient to populate the renewer correctly for Timeline
|
||||||
delegation tokens. (Zhijie Shen via vinodkv)
|
delegation tokens. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature
|
||||||
|
that were caused when adding log-upload-time via YARN-2703. (Xuan Gong via
|
||||||
|
vinodkv)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -33,6 +34,7 @@ import java.io.PrintWriter;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -170,9 +172,9 @@ public class TestLogsCLI {
|
||||||
ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1);
|
ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1);
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptIdPBImpl.newInstance(appId, 1);
|
ApplicationAttemptIdPBImpl.newInstance(appId, 1);
|
||||||
|
ContainerId containerId0 = ContainerIdPBImpl.newInstance(appAttemptId, 0);
|
||||||
ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1);
|
ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1);
|
||||||
ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2);
|
ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2);
|
||||||
|
|
||||||
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
||||||
|
|
||||||
// create local logs
|
// create local logs
|
||||||
|
@ -201,7 +203,15 @@ public class TestLogsCLI {
|
||||||
fs.delete(path, true);
|
fs.delete(path, true);
|
||||||
}
|
}
|
||||||
assertTrue(fs.mkdirs(path));
|
assertTrue(fs.mkdirs(path));
|
||||||
|
|
||||||
// upload container logs into remote directory
|
// upload container logs into remote directory
|
||||||
|
// the first two logs is empty. When we try to read first two logs,
|
||||||
|
// we will meet EOF exception, but it will not impact other logs.
|
||||||
|
// Other logs should be read successfully.
|
||||||
|
uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
||||||
|
containerId0, path, fs);
|
||||||
|
uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
||||||
|
containerId1, path, fs);
|
||||||
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
||||||
containerId1, path, fs);
|
containerId1, path, fs);
|
||||||
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
||||||
|
@ -220,6 +230,9 @@ public class TestLogsCLI {
|
||||||
"Hello container_0_0001_01_000002!"));
|
"Hello container_0_0001_01_000002!"));
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
|
|
||||||
|
// uploaded two logs for container1. The first log is empty.
|
||||||
|
// The second one is not empty.
|
||||||
|
// We can still successfully read logs for container1.
|
||||||
exitCode =
|
exitCode =
|
||||||
cli.run(new String[] { "-applicationId", appId.toString(),
|
cli.run(new String[] { "-applicationId", appId.toString(),
|
||||||
"-nodeAddress", nodeId.toString(), "-containerId",
|
"-nodeAddress", nodeId.toString(), "-containerId",
|
||||||
|
@ -228,6 +241,22 @@ public class TestLogsCLI {
|
||||||
assertTrue(sysOutStream.toString().contains(
|
assertTrue(sysOutStream.toString().contains(
|
||||||
"Hello container_0_0001_01_000001!"));
|
"Hello container_0_0001_01_000001!"));
|
||||||
assertTrue(sysOutStream.toString().contains("Log Upload Time"));
|
assertTrue(sysOutStream.toString().contains("Log Upload Time"));
|
||||||
|
assertTrue(!sysOutStream.toString().contains(
|
||||||
|
"Logs for container " + containerId1.toString()
|
||||||
|
+ " are not present in this log-file."));
|
||||||
|
sysOutStream.reset();
|
||||||
|
|
||||||
|
// Uploaded the empty log for container0.
|
||||||
|
// We should see the message showing the log for container0
|
||||||
|
// are not present.
|
||||||
|
exitCode =
|
||||||
|
cli.run(new String[] { "-applicationId", appId.toString(),
|
||||||
|
"-nodeAddress", nodeId.toString(), "-containerId",
|
||||||
|
containerId0.toString() });
|
||||||
|
assertTrue(exitCode == -1);
|
||||||
|
assertTrue(sysOutStream.toString().contains(
|
||||||
|
"Logs for container " + containerId0.toString()
|
||||||
|
+ " are not present in this log-file."));
|
||||||
|
|
||||||
fs.delete(new Path(remoteLogRootDir), true);
|
fs.delete(new Path(remoteLogRootDir), true);
|
||||||
fs.delete(new Path(rootLogDir), true);
|
fs.delete(new Path(rootLogDir), true);
|
||||||
|
@ -266,6 +295,31 @@ public class TestLogsCLI {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
||||||
|
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
|
||||||
|
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
|
||||||
|
Path path =
|
||||||
|
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
|
||||||
|
+ System.currentTimeMillis());
|
||||||
|
AggregatedLogFormat.LogWriter writer =
|
||||||
|
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
||||||
|
writer.writeApplicationOwner(ugi.getUserName());
|
||||||
|
|
||||||
|
Map<ApplicationAccessType, String> appAcls =
|
||||||
|
new HashMap<ApplicationAccessType, String>();
|
||||||
|
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
||||||
|
writer.writeApplicationACLs(appAcls);
|
||||||
|
DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
|
||||||
|
new AggregatedLogFormat.LogKey(containerId).write(out);
|
||||||
|
out.close();
|
||||||
|
out = writer.getWriter().prepareAppendValue(-1);
|
||||||
|
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
|
||||||
|
new HashSet<File>());
|
||||||
|
out.close();
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
private YarnClient createMockYarnClient(YarnApplicationState appState)
|
private YarnClient createMockYarnClient(YarnApplicationState appState)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
YarnClient mockClient = mock(YarnClient.class);
|
YarnClient mockClient = mock(YarnClient.class);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
@ -44,6 +45,7 @@ import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.io.input.BoundedInputStream;
|
import org.apache.commons.io.input.BoundedInputStream;
|
||||||
|
import org.apache.commons.io.output.WriterOutputStream;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -233,9 +235,6 @@ public class AggregatedLogFormat {
|
||||||
// Write the logFile Type
|
// Write the logFile Type
|
||||||
out.writeUTF(logFile.getName());
|
out.writeUTF(logFile.getName());
|
||||||
|
|
||||||
// Write the uploaded TimeStamp
|
|
||||||
out.writeLong(System.currentTimeMillis());
|
|
||||||
|
|
||||||
// Write the log length as UTF so that it is printable
|
// Write the log length as UTF so that it is printable
|
||||||
out.writeUTF(String.valueOf(fileLength));
|
out.writeUTF(String.valueOf(fileLength));
|
||||||
|
|
||||||
|
@ -400,6 +399,11 @@ public class AggregatedLogFormat {
|
||||||
writeVersion();
|
writeVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public TFile.Writer getWriter() {
|
||||||
|
return this.writer;
|
||||||
|
}
|
||||||
|
|
||||||
private void writeVersion() throws IOException {
|
private void writeVersion() throws IOException {
|
||||||
DataOutputStream out = this.writer.prepareAppendKey(-1);
|
DataOutputStream out = this.writer.prepareAppendKey(-1);
|
||||||
VERSION_KEY.write(out);
|
VERSION_KEY.write(out);
|
||||||
|
@ -639,69 +643,54 @@ public class AggregatedLogFormat {
|
||||||
* Writes all logs for a single container to the provided writer.
|
* Writes all logs for a single container to the provided writer.
|
||||||
* @param valueStream
|
* @param valueStream
|
||||||
* @param writer
|
* @param writer
|
||||||
|
* @param logUploadedTime
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void readAcontainerLogs(DataInputStream valueStream,
|
public static void readAcontainerLogs(DataInputStream valueStream,
|
||||||
Writer writer) throws IOException {
|
Writer writer, long logUploadedTime) throws IOException {
|
||||||
int bufferSize = 65536;
|
OutputStream os = null;
|
||||||
char[] cbuf = new char[bufferSize];
|
PrintStream ps = null;
|
||||||
String fileType;
|
try {
|
||||||
long uploadTime;
|
os = new WriterOutputStream(writer);
|
||||||
String fileLengthStr;
|
ps = new PrintStream(os);
|
||||||
long fileLength;
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
fileType = valueStream.readUTF();
|
readContainerLogs(valueStream, ps, logUploadedTime);
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
// EndOfFile
|
// EndOfFile
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uploadTime = valueStream.readLong();
|
|
||||||
fileLengthStr = valueStream.readUTF();
|
|
||||||
fileLength = Long.parseLong(fileLengthStr);
|
|
||||||
writer.write("\n\nLogType:");
|
|
||||||
writer.write(fileType);
|
|
||||||
writer.write("\nLogUploadTime:");
|
|
||||||
writer.write(String.valueOf(uploadTime));
|
|
||||||
writer.write("\nLogLength:");
|
|
||||||
writer.write(fileLengthStr);
|
|
||||||
writer.write("\nLog Contents:\n");
|
|
||||||
// ByteLevel
|
|
||||||
BoundedInputStream bis =
|
|
||||||
new BoundedInputStream(valueStream, fileLength);
|
|
||||||
InputStreamReader reader = new InputStreamReader(bis);
|
|
||||||
int currentRead = 0;
|
|
||||||
int totalRead = 0;
|
|
||||||
while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
|
|
||||||
writer.write(cbuf, 0, currentRead);
|
|
||||||
totalRead += currentRead;
|
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, ps);
|
||||||
|
IOUtils.cleanup(LOG, os);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keep calling this till you get a {@link EOFException} for getting logs of
|
* Writes all logs for a single container to the provided writer.
|
||||||
* all types for a single container.
|
|
||||||
*
|
|
||||||
* @param valueStream
|
* @param valueStream
|
||||||
* @param out
|
* @param writer
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void readAContainerLogsForALogType(
|
public static void readAcontainerLogs(DataInputStream valueStream,
|
||||||
DataInputStream valueStream, PrintStream out)
|
Writer writer) throws IOException {
|
||||||
throws IOException {
|
readAcontainerLogs(valueStream, writer, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readContainerLogs(DataInputStream valueStream,
|
||||||
|
PrintStream out, long logUploadedTime) throws IOException {
|
||||||
byte[] buf = new byte[65535];
|
byte[] buf = new byte[65535];
|
||||||
|
|
||||||
String fileType = valueStream.readUTF();
|
String fileType = valueStream.readUTF();
|
||||||
long uploadTime = valueStream.readLong();
|
|
||||||
String fileLengthStr = valueStream.readUTF();
|
String fileLengthStr = valueStream.readUTF();
|
||||||
long fileLength = Long.parseLong(fileLengthStr);
|
long fileLength = Long.parseLong(fileLengthStr);
|
||||||
out.print("LogType:");
|
out.print("LogType:");
|
||||||
out.println(fileType);
|
out.println(fileType);
|
||||||
|
if (logUploadedTime != -1) {
|
||||||
out.print("Log Upload Time:");
|
out.print("Log Upload Time:");
|
||||||
out.println(Times.format(uploadTime));
|
out.println(Times.format(logUploadedTime));
|
||||||
|
}
|
||||||
out.print("LogLength:");
|
out.print("LogLength:");
|
||||||
out.println(fileLengthStr);
|
out.println(fileLengthStr);
|
||||||
out.println("Log Contents:");
|
out.println("Log Contents:");
|
||||||
|
@ -723,6 +712,35 @@ public class AggregatedLogFormat {
|
||||||
out.println("");
|
out.println("");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keep calling this till you get a {@link EOFException} for getting logs of
|
||||||
|
* all types for a single container.
|
||||||
|
*
|
||||||
|
* @param valueStream
|
||||||
|
* @param out
|
||||||
|
* @param logUploadedTime
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void readAContainerLogsForALogType(
|
||||||
|
DataInputStream valueStream, PrintStream out, long logUploadedTime)
|
||||||
|
throws IOException {
|
||||||
|
readContainerLogs(valueStream, out, logUploadedTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keep calling this till you get a {@link EOFException} for getting logs of
|
||||||
|
* all types for a single container.
|
||||||
|
*
|
||||||
|
* @param valueStream
|
||||||
|
* @param out
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void readAContainerLogsForALogType(
|
||||||
|
DataInputStream valueStream, PrintStream out)
|
||||||
|
throws IOException {
|
||||||
|
readAContainerLogsForALogType(valueStream, out, -1);
|
||||||
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
|
IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
|
||||||
}
|
}
|
||||||
|
@ -732,7 +750,6 @@ public class AggregatedLogFormat {
|
||||||
public static class ContainerLogsReader {
|
public static class ContainerLogsReader {
|
||||||
private DataInputStream valueStream;
|
private DataInputStream valueStream;
|
||||||
private String currentLogType = null;
|
private String currentLogType = null;
|
||||||
private long currentLogUpLoadTime = 0;
|
|
||||||
private long currentLogLength = 0;
|
private long currentLogLength = 0;
|
||||||
private BoundedInputStream currentLogData = null;
|
private BoundedInputStream currentLogData = null;
|
||||||
private InputStreamReader currentLogISR;
|
private InputStreamReader currentLogISR;
|
||||||
|
@ -753,14 +770,12 @@ public class AggregatedLogFormat {
|
||||||
}
|
}
|
||||||
|
|
||||||
currentLogType = null;
|
currentLogType = null;
|
||||||
currentLogUpLoadTime = 0;
|
|
||||||
currentLogLength = 0;
|
currentLogLength = 0;
|
||||||
currentLogData = null;
|
currentLogData = null;
|
||||||
currentLogISR = null;
|
currentLogISR = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String logType = valueStream.readUTF();
|
String logType = valueStream.readUTF();
|
||||||
long logUpLoadTime = valueStream.readLong();
|
|
||||||
String logLengthStr = valueStream.readUTF();
|
String logLengthStr = valueStream.readUTF();
|
||||||
currentLogLength = Long.parseLong(logLengthStr);
|
currentLogLength = Long.parseLong(logLengthStr);
|
||||||
currentLogData =
|
currentLogData =
|
||||||
|
@ -768,7 +783,6 @@ public class AggregatedLogFormat {
|
||||||
currentLogData.setPropagateClose(false);
|
currentLogData.setPropagateClose(false);
|
||||||
currentLogISR = new InputStreamReader(currentLogData);
|
currentLogISR = new InputStreamReader(currentLogData);
|
||||||
currentLogType = logType;
|
currentLogType = logType;
|
||||||
currentLogUpLoadTime = logUpLoadTime;
|
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -779,10 +793,6 @@ public class AggregatedLogFormat {
|
||||||
return currentLogType;
|
return currentLogType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getCurrentLogUpLoadTime() {
|
|
||||||
return currentLogUpLoadTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getCurrentLogLength() {
|
public long getCurrentLogLength() {
|
||||||
return currentLogLength;
|
return currentLogLength;
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,8 @@ public class LogCLIHelpers implements Configurable {
|
||||||
reader =
|
reader =
|
||||||
new AggregatedLogFormat.LogReader(getConf(),
|
new AggregatedLogFormat.LogReader(getConf(),
|
||||||
thisNodeFile.getPath());
|
thisNodeFile.getPath());
|
||||||
if (dumpAContainerLogs(containerId, reader, System.out) > -1) {
|
if (dumpAContainerLogs(containerId, reader, System.out,
|
||||||
|
thisNodeFile.getModificationTime()) > -1) {
|
||||||
foundContainerLogs = true;
|
foundContainerLogs = true;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -97,7 +98,8 @@ public class LogCLIHelpers implements Configurable {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public int dumpAContainerLogs(String containerIdStr,
|
public int dumpAContainerLogs(String containerIdStr,
|
||||||
AggregatedLogFormat.LogReader reader, PrintStream out) throws IOException {
|
AggregatedLogFormat.LogReader reader, PrintStream out,
|
||||||
|
long logUploadedTime) throws IOException {
|
||||||
DataInputStream valueStream;
|
DataInputStream valueStream;
|
||||||
LogKey key = new LogKey();
|
LogKey key = new LogKey();
|
||||||
valueStream = reader.next(key);
|
valueStream = reader.next(key);
|
||||||
|
@ -112,15 +114,21 @@ public class LogCLIHelpers implements Configurable {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean foundContainerLogs = false;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
LogReader.readAContainerLogsForALogType(valueStream, out);
|
LogReader.readAContainerLogsForALogType(valueStream, out,
|
||||||
|
logUploadedTime);
|
||||||
|
foundContainerLogs = true;
|
||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (foundContainerLogs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
|
public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
|
||||||
|
@ -157,13 +165,15 @@ public class LogCLIHelpers implements Configurable {
|
||||||
valueStream = reader.next(key);
|
valueStream = reader.next(key);
|
||||||
|
|
||||||
while (valueStream != null) {
|
while (valueStream != null) {
|
||||||
|
|
||||||
String containerString =
|
String containerString =
|
||||||
"\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
|
"\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
|
||||||
out.println(containerString);
|
out.println(containerString);
|
||||||
out.println(StringUtils.repeat("=", containerString.length()));
|
out.println(StringUtils.repeat("=", containerString.length()));
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
LogReader.readAContainerLogsForALogType(valueStream, out);
|
LogReader.readAContainerLogsForALogType(valueStream, out,
|
||||||
|
thisNodeFile.getModificationTime());
|
||||||
foundAnyLogs = true;
|
foundAnyLogs = true;
|
||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -126,6 +126,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
||||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
long logUploadedTime = thisNodeFile.getModificationTime();
|
||||||
reader =
|
reader =
|
||||||
new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
|
new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
|
||||||
|
|
||||||
|
@ -164,7 +165,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
||||||
}
|
}
|
||||||
|
|
||||||
foundLog = readContainerLogs(html, logReader, logLimits,
|
foundLog = readContainerLogs(html, logReader, logLimits,
|
||||||
desiredLogType);
|
desiredLogType, logUploadedTime);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Error getting logs for " + logEntity, ex);
|
LOG.error("Error getting logs for " + logEntity, ex);
|
||||||
continue;
|
continue;
|
||||||
|
@ -189,7 +190,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
||||||
|
|
||||||
private boolean readContainerLogs(Block html,
|
private boolean readContainerLogs(Block html,
|
||||||
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
|
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
|
||||||
String desiredLogType) throws IOException {
|
String desiredLogType, long logUpLoadTime) throws IOException {
|
||||||
int bufferSize = 65536;
|
int bufferSize = 65536;
|
||||||
char[] cbuf = new char[bufferSize];
|
char[] cbuf = new char[bufferSize];
|
||||||
|
|
||||||
|
@ -199,13 +200,12 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
||||||
if (desiredLogType == null || desiredLogType.isEmpty()
|
if (desiredLogType == null || desiredLogType.isEmpty()
|
||||||
|| desiredLogType.equals(logType)) {
|
|| desiredLogType.equals(logType)) {
|
||||||
long logLength = logReader.getCurrentLogLength();
|
long logLength = logReader.getCurrentLogLength();
|
||||||
long logUpLoadTime = logReader.getCurrentLogUpLoadTime();
|
|
||||||
if (foundLog) {
|
if (foundLog) {
|
||||||
html.pre()._("\n\n")._();
|
html.pre()._("\n\n")._();
|
||||||
}
|
}
|
||||||
|
|
||||||
html.p()._("Log Type: " + logType)._();
|
html.p()._("Log Type: " + logType)._();
|
||||||
html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._();
|
html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
|
||||||
html.p()._("Log Length: " + Long.toString(logLength))._();
|
html.p()._("Log Length: " + Long.toString(logLength))._();
|
||||||
|
|
||||||
long start = logLimits.start < 0
|
long start = logLimits.start < 0
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
|
||||||
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -178,9 +179,16 @@ public class TestAggregatedLogFormat {
|
||||||
logWriter.close();
|
logWriter.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
//Verify the output generated by readAContainerLogs(DataInputStream, Writer)
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadAcontainerLogs1() throws Exception {
|
public void testReadAcontainerLogs1() throws Exception {
|
||||||
|
//Verify the output generated by readAContainerLogs(DataInputStream, Writer, logUploadedTime)
|
||||||
|
testReadAcontainerLog(true);
|
||||||
|
|
||||||
|
//Verify the output generated by readAContainerLogs(DataInputStream, Writer)
|
||||||
|
testReadAcontainerLog(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
|
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
|
||||||
Path remoteAppLogFile =
|
Path remoteAppLogFile =
|
||||||
|
@ -233,17 +241,23 @@ public class TestAggregatedLogFormat {
|
||||||
LogKey rLogKey = new LogKey();
|
LogKey rLogKey = new LogKey();
|
||||||
DataInputStream dis = logReader.next(rLogKey);
|
DataInputStream dis = logReader.next(rLogKey);
|
||||||
Writer writer = new StringWriter();
|
Writer writer = new StringWriter();
|
||||||
|
|
||||||
|
if (logUploadedTime) {
|
||||||
|
LogReader.readAcontainerLogs(dis, writer, System.currentTimeMillis());
|
||||||
|
} else {
|
||||||
LogReader.readAcontainerLogs(dis, writer);
|
LogReader.readAcontainerLogs(dis, writer);
|
||||||
|
}
|
||||||
|
|
||||||
// We should only do the log aggregation for stdout.
|
// We should only do the log aggregation for stdout.
|
||||||
// Since we could not open the fileInputStream for stderr, this file is not
|
// Since we could not open the fileInputStream for stderr, this file is not
|
||||||
// aggregated.
|
// aggregated.
|
||||||
String s = writer.toString();
|
String s = writer.toString();
|
||||||
int expectedLength =
|
int expectedLength =
|
||||||
"\n\nLogType:stdout".length()
|
"LogType:stdout".length()
|
||||||
+ ("\nLogUploadTime:" + System.currentTimeMillis()).length()
|
+ (logUploadedTime ? ("\nLog Upload Time:" + Times.format(System
|
||||||
|
.currentTimeMillis())).length() : 0)
|
||||||
+ ("\nLogLength:" + numChars).length()
|
+ ("\nLogLength:" + numChars).length()
|
||||||
+ "\nLog Contents:\n".length() + numChars;
|
+ "\nLog Contents:\n".length() + numChars + "\n".length();
|
||||||
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
|
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
|
||||||
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
|
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
|
||||||
Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs"));
|
Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs"));
|
||||||
|
|
|
@ -767,30 +767,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
try {
|
try {
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
PrintStream ps = new PrintStream(baos);
|
PrintStream ps = new PrintStream(baos);
|
||||||
|
|
||||||
LogReader.readAContainerLogsForALogType(valueStream, ps);
|
LogReader.readAContainerLogsForALogType(valueStream, ps);
|
||||||
|
|
||||||
String writtenLines[] = baos.toString().split(
|
String writtenLines[] = baos.toString().split(
|
||||||
System.getProperty("line.separator"));
|
System.getProperty("line.separator"));
|
||||||
|
|
||||||
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
||||||
String fileType = writtenLines[0].substring(9);
|
String fileType = writtenLines[0].substring(8);
|
||||||
|
|
||||||
Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14));
|
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
|
||||||
String fileUploadedTimeStr = writtenLines[1].substring(15);
|
String fileLengthStr = writtenLines[1].substring(10);
|
||||||
|
|
||||||
Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10));
|
|
||||||
String fileLengthStr = writtenLines[2].substring(11);
|
|
||||||
long fileLength = Long.parseLong(fileLengthStr);
|
long fileLength = Long.parseLong(fileLengthStr);
|
||||||
|
|
||||||
Assert.assertEquals("Log Contents:",
|
Assert.assertEquals("Log Contents:",
|
||||||
writtenLines[3].substring(0, 13));
|
writtenLines[2].substring(0, 13));
|
||||||
|
|
||||||
String logContents = StringUtils.join(
|
String logContents = StringUtils.join(
|
||||||
Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n");
|
Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n");
|
||||||
perContainerMap.put(fileType, logContents);
|
perContainerMap.put(fileType, logContents);
|
||||||
|
|
||||||
LOG.info("LogType:" + fileType);
|
LOG.info("LogType:" + fileType);
|
||||||
LOG.info("LogUploadTime:" + fileUploadedTimeStr);
|
|
||||||
LOG.info("LogLength:" + fileLength);
|
LOG.info("LogLength:" + fileLength);
|
||||||
LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
|
LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
|
||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
|
|
Loading…
Reference in New Issue