svn merge -c 1458466 FIXES: YARN-200. yarn log does not output all needed information, and is in a binary format. Contributed by Ravi Prakash
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1458473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
35bc29813f
commit
f854b9863b
|
@ -362,6 +362,9 @@ Release 0.23.7 - UNRELEASED
|
||||||
YARN-468. coverage fix for org.apache.hadoop.yarn.server.webproxy.amfilter
|
YARN-468. coverage fix for org.apache.hadoop.yarn.server.webproxy.amfilter
|
||||||
(Aleksey Gorshkov via bobby)
|
(Aleksey Gorshkov via bobby)
|
||||||
|
|
||||||
|
YARN-200. yarn log does not output all needed information, and is in a
|
||||||
|
binary format (Ravi Prakash via jlowe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-357. App submission should not be synchronized (daryn)
|
YARN-357. App submission should not be synchronized (daryn)
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -505,7 +506,7 @@ public class AggregatedLogFormat {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void readAContainerLogsForALogType(
|
public static void readAContainerLogsForALogType(
|
||||||
DataInputStream valueStream, DataOutputStream out)
|
DataInputStream valueStream, PrintStream out)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
byte[] buf = new byte[65535];
|
byte[] buf = new byte[65535];
|
||||||
|
@ -513,11 +514,11 @@ public class AggregatedLogFormat {
|
||||||
String fileType = valueStream.readUTF();
|
String fileType = valueStream.readUTF();
|
||||||
String fileLengthStr = valueStream.readUTF();
|
String fileLengthStr = valueStream.readUTF();
|
||||||
long fileLength = Long.parseLong(fileLengthStr);
|
long fileLength = Long.parseLong(fileLengthStr);
|
||||||
out.writeUTF("\nLogType:");
|
out.print("LogType: ");
|
||||||
out.writeUTF(fileType);
|
out.println(fileType);
|
||||||
out.writeUTF("\nLogLength:");
|
out.print("LogLength: ");
|
||||||
out.writeUTF(fileLengthStr);
|
out.println(fileLengthStr);
|
||||||
out.writeUTF("\nLog Contents:\n");
|
out.println("Log Contents:");
|
||||||
|
|
||||||
int curRead = 0;
|
int curRead = 0;
|
||||||
long pendingRead = fileLength - curRead;
|
long pendingRead = fileLength - curRead;
|
||||||
|
@ -533,6 +534,7 @@ public class AggregatedLogFormat {
|
||||||
pendingRead > buf.length ? buf.length : (int) pendingRead;
|
pendingRead > buf.length ? buf.length : (int) pendingRead;
|
||||||
len = valueStream.read(buf, 0, toRead);
|
len = valueStream.read(buf, 0, toRead);
|
||||||
}
|
}
|
||||||
|
out.println("");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
|
|
@ -19,10 +19,10 @@
|
||||||
package org.apache.hadoop.yarn.logaggregation;
|
package org.apache.hadoop.yarn.logaggregation;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.CommandLineParser;
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
|
@ -30,6 +30,7 @@ import org.apache.commons.cli.GnuParser;
|
||||||
import org.apache.commons.cli.HelpFormatter;
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.cli.ParseException;
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
@ -57,10 +58,13 @@ public class LogDumper extends Configured implements Tool {
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
|
|
||||||
Options opts = new Options();
|
Options opts = new Options();
|
||||||
opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
|
opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId (required)");
|
||||||
opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
|
opts.addOption(CONTAINER_ID_OPTION, true,
|
||||||
opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
|
"ContainerId (must be specified if node address is specified)");
|
||||||
opts.addOption(APP_OWNER_OPTION, true, "AppOwner");
|
opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
|
||||||
|
+ "nodename:port (must be specified if container id is specified)");
|
||||||
|
opts.addOption(APP_OWNER_OPTION, true,
|
||||||
|
"AppOwner (assumed to be current user if not specified)");
|
||||||
|
|
||||||
if (args.length < 1) {
|
if (args.length < 1) {
|
||||||
HelpFormatter formatter = new HelpFormatter();
|
HelpFormatter formatter = new HelpFormatter();
|
||||||
|
@ -99,14 +103,12 @@ public class LogDumper extends Configured implements Tool {
|
||||||
ApplicationId appId =
|
ApplicationId appId =
|
||||||
ConverterUtils.toApplicationId(recordFactory, appIdStr);
|
ConverterUtils.toApplicationId(recordFactory, appIdStr);
|
||||||
|
|
||||||
DataOutputStream out = new DataOutputStream(System.out);
|
|
||||||
|
|
||||||
if (appOwner == null || appOwner.isEmpty()) {
|
if (appOwner == null || appOwner.isEmpty()) {
|
||||||
appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
|
appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
}
|
}
|
||||||
int resultCode = 0;
|
int resultCode = 0;
|
||||||
if (containerIdStr == null && nodeAddress == null) {
|
if (containerIdStr == null && nodeAddress == null) {
|
||||||
resultCode = dumpAllContainersLogs(appId, appOwner, out);
|
resultCode = dumpAllContainersLogs(appId, appOwner, System.out);
|
||||||
} else if ((containerIdStr == null && nodeAddress != null)
|
} else if ((containerIdStr == null && nodeAddress != null)
|
||||||
|| (containerIdStr != null && nodeAddress == null)) {
|
|| (containerIdStr != null && nodeAddress == null)) {
|
||||||
System.out.println("ContainerId or NodeAddress cannot be null!");
|
System.out.println("ContainerId or NodeAddress cannot be null!");
|
||||||
|
@ -125,7 +127,7 @@ public class LogDumper extends Configured implements Tool {
|
||||||
appOwner,
|
appOwner,
|
||||||
ConverterUtils.toNodeId(nodeAddress),
|
ConverterUtils.toNodeId(nodeAddress),
|
||||||
LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
|
LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
|
||||||
resultCode = dumpAContainerLogs(containerIdStr, reader, out);
|
resultCode = dumpAContainerLogs(containerIdStr, reader, System.out);
|
||||||
}
|
}
|
||||||
|
|
||||||
return resultCode;
|
return resultCode;
|
||||||
|
@ -149,12 +151,11 @@ public class LogDumper extends Configured implements Tool {
|
||||||
"Log aggregation has not completed or is not enabled.");
|
"Log aggregation has not completed or is not enabled.");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
DataOutputStream out = new DataOutputStream(System.out);
|
return dumpAContainerLogs(containerId, reader, System.out);
|
||||||
return dumpAContainerLogs(containerId, reader, out);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int dumpAContainerLogs(String containerIdStr,
|
private int dumpAContainerLogs(String containerIdStr,
|
||||||
AggregatedLogFormat.LogReader reader, DataOutputStream out)
|
AggregatedLogFormat.LogReader reader, PrintStream out)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DataInputStream valueStream;
|
DataInputStream valueStream;
|
||||||
LogKey key = new LogKey();
|
LogKey key = new LogKey();
|
||||||
|
@ -183,7 +184,7 @@ public class LogDumper extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int dumpAllContainersLogs(ApplicationId appId, String appOwner,
|
private int dumpAllContainersLogs(ApplicationId appId, String appOwner,
|
||||||
DataOutputStream out) throws IOException {
|
PrintStream out) throws IOException {
|
||||||
Path remoteRootLogDir =
|
Path remoteRootLogDir =
|
||||||
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||||
|
@ -216,6 +217,9 @@ public class LogDumper extends Configured implements Tool {
|
||||||
valueStream = reader.next(key);
|
valueStream = reader.next(key);
|
||||||
|
|
||||||
while (valueStream != null) {
|
while (valueStream != null) {
|
||||||
|
String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
|
||||||
|
out.println(containerString);
|
||||||
|
out.println(StringUtils.repeat("=", containerString.length()));
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
LogReader.readAContainerLogsForALogType(valueStream, out);
|
LogReader.readAContainerLogsForALogType(valueStream, out);
|
||||||
|
|
|
@ -22,11 +22,13 @@ import static org.mockito.Mockito.*;
|
||||||
import static junit.framework.Assert.assertEquals;
|
import static junit.framework.Assert.assertEquals;
|
||||||
import static junit.framework.Assert.assertTrue;
|
import static junit.framework.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
@ -40,6 +42,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
@ -531,24 +534,26 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
LogReader.readAContainerLogsForALogType(valueStream, dob);
|
PrintStream ps = new PrintStream(baos);
|
||||||
|
LogReader.readAContainerLogsForALogType(valueStream, ps);
|
||||||
|
|
||||||
DataInputBuffer dib = new DataInputBuffer();
|
String writtenLines[] = baos.toString().split(
|
||||||
dib.reset(dob.getData(), dob.getLength());
|
System.getProperty("line.separator"));
|
||||||
|
|
||||||
Assert.assertEquals("\nLogType:", dib.readUTF());
|
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
||||||
String fileType = dib.readUTF();
|
String fileType = writtenLines[0].substring(9);
|
||||||
|
|
||||||
Assert.assertEquals("\nLogLength:", dib.readUTF());
|
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
|
||||||
String fileLengthStr = dib.readUTF();
|
String fileLengthStr = writtenLines[1].substring(11);
|
||||||
long fileLength = Long.parseLong(fileLengthStr);
|
long fileLength = Long.parseLong(fileLengthStr);
|
||||||
|
|
||||||
Assert.assertEquals("\nLog Contents:\n", dib.readUTF());
|
Assert.assertEquals("Log Contents:",
|
||||||
byte[] buf = new byte[(int) fileLength]; // cast is okay in this
|
writtenLines[2].substring(0, 13));
|
||||||
// test.
|
|
||||||
dib.read(buf, 0, (int) fileLength);
|
String logContents = StringUtils.join(
|
||||||
perContainerMap.put(fileType, new String(buf));
|
Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n");
|
||||||
|
perContainerMap.put(fileType, logContents);
|
||||||
|
|
||||||
LOG.info("LogType:" + fileType);
|
LOG.info("LogType:" + fileType);
|
||||||
LOG.info("LogType:" + fileLength);
|
LOG.info("LogType:" + fileLength);
|
||||||
|
|
Loading…
Reference in New Issue