From c34c823d26d87d5aee4127cd2b24e8b0d8f82369 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Fri, 15 Sep 2017 15:33:24 -0700 Subject: [PATCH] YARN-7174. Add retry logic in LogsCLI when fetch running application logs. Contributed by Xuan Gong. (cherry picked from commit 1a84c24b0cf6674fa755403971fa57d8e412b320) --- .../hadoop/yarn/client/cli/LogsCLI.java | 175 ++++++++++++++- .../hadoop/yarn/client/cli/TestLogsCLI.java | 205 ++++++++++++------ 2 files changed, 309 insertions(+), 71 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index f2d9837dcf9..c049a7454a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.net.ConnectException; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -75,9 +78,11 @@ import org.codehaus.jettison.json.JSONObject; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientRequest; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.filter.ClientFilter; @Public @Evolving @@ -98,14 +103,27 @@ public class LogsCLI extends Configured implements Tool { = "show_container_log_info"; private static final String OUT_OPTION = "out"; private static final String SIZE_OPTION = "size"; + private static final String CLIENT_MAX_RETRY_OPTION = "client_max_retries"; + private static final String CLIENT_RETRY_INTERVAL_OPTION + = "client_retry_interval_ms"; public static final String HELP_CMD = "help"; + private PrintStream outStream = System.out; private YarnClient yarnClient = null; + private Client webServiceClient = null; + + private static final int DEFAULT_MAX_RETRIES = 30; + private static final long DEFAULT_RETRY_INTERVAL = 1000; + + @Private + @VisibleForTesting + ClientConnectionRetry connectionRetry; @Override public int run(String[] args) throws Exception { try { yarnClient = createYarnClient(); + webServiceClient = Client.create(); return runCommand(args); } finally { if (yarnClient != null) { @@ -140,6 +158,8 @@ public class LogsCLI extends Configured implements Tool { List amContainersList = new ArrayList(); String localDir = null; long bytes = Long.MAX_VALUE; + int maxRetries = DEFAULT_MAX_RETRIES; + long retryInterval = DEFAULT_RETRY_INTERVAL; try { CommandLine commandLine = parser.parse(opts, args, false); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); @@ -171,6 +191,14 @@ public class LogsCLI extends Configured implements Tool { if (commandLine.hasOption(SIZE_OPTION)) { bytes = Long.parseLong(commandLine.getOptionValue(SIZE_OPTION)); } + if (commandLine.hasOption(CLIENT_MAX_RETRY_OPTION)) { + maxRetries = Integer.parseInt(commandLine.getOptionValue( + CLIENT_MAX_RETRY_OPTION)); + } + if (commandLine.hasOption(CLIENT_RETRY_INTERVAL_OPTION)) { + retryInterval = Long.parseLong(commandLine.getOptionValue( + CLIENT_RETRY_INTERVAL_OPTION)); + } } catch (ParseException e) { System.err.println("options parsing failed: " + e.getMessage()); printHelpMessage(printOpts); @@ -232,6 +260,11 @@ public class LogsCLI extends Configured implements Tool { } } + // Set up Retry WebService Client + connectionRetry = new ClientConnectionRetry(maxRetries, retryInterval); + ClientJerseyRetryFilter retryFilter = new ClientJerseyRetryFilter(); + webServiceClient.addFilter(retryFilter); + LogCLIHelpers logCliHelper = new LogCLIHelpers(); logCliHelper.setConf(getConf()); @@ -342,7 +375,6 @@ public class LogsCLI extends Configured implements Tool { protected List getAMContainerInfoForRMWebService( Configuration conf, String appId) throws ClientHandlerException, UniformInterfaceException, JSONException { - Client webServiceClient = Client.create(); String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf); WebResource webResource = webServiceClient.resource(webAppAddress); @@ -364,7 +396,6 @@ public class LogsCLI extends Configured implements Tool { private List getAMContainerInfoForAHSWebService( Configuration conf, String appId) throws ClientHandlerException, UniformInterfaceException, JSONException { - Client webServiceClient = Client.create(); String webAppAddress = WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils.getAHSWebAppURLWithoutScheme(conf); @@ -417,7 +448,6 @@ public class LogsCLI extends Configured implements Tool { throws IOException { List> logFileInfos = new ArrayList<>(); - Client webServiceClient = Client.create(); try { WebResource webResource = webServiceClient .resource(WebAppUtils.getHttpSchemePrefix(conf) + nodeHttpAddress); @@ -491,7 +521,6 @@ public class LogsCLI extends Configured implements Tool { lastModificationTime); } - @Private @VisibleForTesting public int printContainerLogsFromRunningApplication(Configuration conf, @@ -522,7 +551,6 @@ public class LogsCLI extends Configured implements Tool { ContainerLogsRequest newOptions = new ContainerLogsRequest(request); newOptions.setLogTypes(matchedFiles); - Client webServiceClient = Client.create(); boolean foundAnyLogs = false; byte[] buffer = new byte[65536]; for (String logFile : newOptions.getLogTypes()) { @@ -798,6 +826,10 @@ public class LogsCLI extends Configured implements Tool { } } + /** + * Create Command Options. + * @return the command options + */ private Options createCommandOpts() { Options opts = new Options(); opts.addOption(HELP_CMD, false, "Displays help for all commands."); @@ -860,6 +892,13 @@ public class LogsCLI extends Configured implements Tool { opts.addOption(SIZE_OPTION, true, "Prints the log file's first 'n' bytes " + "or the last 'n' bytes. Use negative values as bytes to read from " + "the end and positive values as bytes to read from the beginning."); + opts.addOption(CLIENT_MAX_RETRY_OPTION, true, "Set max retry number for a" + + " retry client to get the container logs for the running " + + "applications. Use a negative value to make retry forever. " + + "The default value is 30."); + opts.addOption(CLIENT_RETRY_INTERVAL_OPTION, true, + "Work with --client_max_retries to create a retry client. " + + "The default value is 1000."); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); @@ -867,9 +906,17 @@ public class LogsCLI extends Configured implements Tool { opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); opts.getOption(OUT_OPTION).setArgName("Local Directory"); opts.getOption(SIZE_OPTION).setArgName("size"); + opts.getOption(CLIENT_MAX_RETRY_OPTION).setArgName("Max Retries"); + opts.getOption(CLIENT_RETRY_INTERVAL_OPTION) + .setArgName("Retry Interval"); return opts; } + /** + * Create Print options for helper message. + * @param commandOpts the options + * @return the print options + */ private Options createPrintOpts(Options commandOpts) { Options printOpts = new Options(); printOpts.addOption(commandOpts.getOption(HELP_CMD)); @@ -885,6 +932,8 @@ public class LogsCLI extends Configured implements Tool { printOpts.addOption(commandOpts.getOption(SIZE_OPTION)); printOpts.addOption(commandOpts.getOption( PER_CONTAINER_LOG_FILES_REGEX_OPTION)); + printOpts.addOption(commandOpts.getOption(CLIENT_MAX_RETRY_OPTION)); + printOpts.addOption(commandOpts.getOption(CLIENT_RETRY_INTERVAL_OPTION)); return printOpts; } @@ -1287,4 +1336,120 @@ public class LogsCLI extends Configured implements Tool { return nodeInfo.has("nodeHTTPAddress") ? nodeInfo.getString("nodeHTTPAddress") : null; } + + // Class to handle retry + static class ClientConnectionRetry { + + // maxRetries < 0 means keep trying + @Private + @VisibleForTesting + public int maxRetries; + + @Private + @VisibleForTesting + public long retryInterval; + + // Indicates if retries happened last time. Only tests should read it. + // In unit tests, retryOn() calls should _not_ be concurrent. + private boolean retried = false; + + @Private + @VisibleForTesting + boolean getRetired() { + return retried; + } + + // Constructor with default retry settings + public ClientConnectionRetry(int inputMaxRetries, + long inputRetryInterval) { + this.maxRetries = inputMaxRetries; + this.retryInterval = inputRetryInterval; + } + + public Object retryOn(ClientRetryOp op) + throws RuntimeException, IOException { + int leftRetries = maxRetries; + retried = false; + + // keep trying + while (true) { + try { + // try perform the op, if fail, keep retrying + return op.run(); + } catch (IOException | RuntimeException e) { + // break if there's no retries left + if (leftRetries == 0) { + break; + } + if (op.shouldRetryOn(e)) { + logException(e, leftRetries); + } else { + throw e; + } + } + if (leftRetries > 0) { + leftRetries--; + } + retried = true; + try { + // sleep for the given time interval + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + System.out.println("Client retry sleep interrupted! "); + } + } + throw new RuntimeException("Connection retries limit exceeded."); + }; + + private void logException(Exception e, int leftRetries) { + if (leftRetries > 0) { + System.out.println("Exception caught by ClientConnectionRetry," + + " will try " + leftRetries + " more time(s).\nMessage: " + + e.getMessage()); + } else { + // note that maxRetries may be -1 at the very beginning + System.out.println("ConnectionException caught by ClientConnectionRetry," + + " will keep retrying.\nMessage: " + + e.getMessage()); + } + } + } + + private class ClientJerseyRetryFilter extends ClientFilter { + @Override + public ClientResponse handle(final ClientRequest cr) + throws ClientHandlerException { + // Set up the retry operation + ClientRetryOp jerseyRetryOp = new ClientRetryOp() { + @Override + public Object run() { + // Try pass the request, if fail, keep retrying + return getNext().handle(cr); + } + + @Override + public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return (e instanceof ClientHandlerException) + && (e.getCause() instanceof ConnectException || + e.getCause() instanceof SocketTimeoutException || + e.getCause() instanceof SocketException); + } + }; + try { + return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); + } catch (IOException e) { + throw new ClientHandlerException("Jersey retry failed!\nMessage: " + + e.getMessage()); + } + } + } + + // Abstract class for an operation that should be retried by client + private static abstract class ClientRetryOp { + // The operation that should be retried + public abstract Object run() throws IOException; + // The method to indicate if we should retry given the incoming exception + public abstract boolean shouldRetryOn(Exception e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 9b0268f57b9..e9d984ea46f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -195,7 +195,7 @@ public class TestLogsCLI { "Unable to get ApplicationState")); } - @Test(timeout = 5000l) + @Test (timeout = 10000) public void testHelpMessage() throws Exception { Configuration conf = new YarnConfiguration(); YarnClient mockYarnClient = createMockYarnClient( @@ -206,79 +206,102 @@ public class TestLogsCLI { int exitCode = dumper.run(new String[]{}); assertTrue(exitCode == -1); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("Retrieve logs for YARN applications."); pw.println("usage: yarn logs -applicationId [OPTIONS]"); pw.println(); pw.println("general options are:"); - pw.println(" -am Prints the AM Container logs for"); - pw.println(" this application. Specify"); - pw.println(" comma-separated value to get logs"); - pw.println(" for related AM Container. For"); - pw.println(" example, If we specify -am 1,2,"); - pw.println(" we will get the logs for the"); - pw.println(" first AM Container as well as the"); - pw.println(" second AM Container. To get logs"); - pw.println(" for all AM Containers, use -am"); - pw.println(" ALL. To get logs for the latest"); - pw.println(" AM Container, use -am -1. By"); - pw.println(" default, it will print all"); - pw.println(" available logs. Work with"); - pw.println(" -log_files to get only specific"); - pw.println(" logs."); - pw.println(" -appOwner AppOwner (assumed to be current"); - pw.println(" user if not specified)"); - pw.println(" -containerId ContainerId. By default, it will"); - pw.println(" print all available logs. Work"); - pw.println(" with -log_files to get only"); - pw.println(" specific logs. If specified, the"); - pw.println(" applicationId can be omitted"); - pw.println(" -help Displays help for all commands."); - pw.println(" -list_nodes Show the list of nodes that"); - pw.println(" successfully aggregated logs."); - pw.println(" This option can only be used with"); - pw.println(" finished applications."); - pw.println(" -log_files Specify comma-separated value to"); - pw.println(" get exact matched log files. Use"); - pw.println(" \"ALL\" or \"*\" to fetch all the log"); - pw.println(" files for the container."); - pw.println(" -log_files_pattern Specify comma-separated value to"); - pw.println(" get matched log files by using"); - pw.println(" java regex. Use \".*\" to fetch all"); - pw.println(" the log files for the container."); - pw.println(" -nodeAddress NodeAddress in the format"); - pw.println(" nodename:port"); - pw.println(" -out Local directory for storing"); - pw.println(" individual container logs. The"); - pw.println(" container logs will be stored"); - pw.println(" based on the node the container"); - pw.println(" ran on."); - pw.println(" -show_application_log_info Show the containerIds which"); - pw.println(" belong to the specific"); - pw.println(" Application. You can combine this"); - pw.println(" with --nodeAddress to get"); - pw.println(" containerIds for all the"); - pw.println(" containers on the specific"); - pw.println(" NodeManager."); - pw.println(" -show_container_log_info Show the container log metadata,"); - pw.println(" including log-file names, the"); - pw.println(" size of the log files. You can"); - pw.println(" combine this with --containerId"); - pw.println(" to get log metadata for the"); - pw.println(" specific container, or with"); - pw.println(" --nodeAddress to get log metadata"); - pw.println(" for all the containers on the"); - pw.println(" specific NodeManager."); - pw.println(" -size Prints the log file's first 'n'"); - pw.println(" bytes or the last 'n' bytes. Use"); - pw.println(" negative values as bytes to read"); - pw.println(" from the end and positive values"); - pw.println(" as bytes to read from the"); - pw.println(" beginning."); + pw.println(" -am Prints the AM Container logs"); + pw.println(" for this application."); + pw.println(" Specify comma-separated"); + pw.println(" value to get logs for"); + pw.println(" related AM Container. For"); + pw.println(" example, If we specify -am"); + pw.println(" 1,2, we will get the logs"); + pw.println(" for the first AM Container"); + pw.println(" as well as the second AM"); + pw.println(" Container. To get logs for"); + pw.println(" all AM Containers, use -am"); + pw.println(" ALL. To get logs for the"); + pw.println(" latest AM Container, use -am"); + pw.println(" -1. By default, it will"); + pw.println(" print all available logs."); + pw.println(" Work with -log_files to get"); + pw.println(" only specific logs."); + pw.println(" -appOwner AppOwner (assumed to be"); + pw.println(" current user if not"); + pw.println(" specified)"); + pw.println(" -client_max_retries Set max retry number for a"); + pw.println(" retry client to get the"); + pw.println(" container logs for the"); + pw.println(" running applications. Use a"); + pw.println(" negative value to make retry"); + pw.println(" forever. The default value"); + pw.println(" is 30."); + pw.println(" -client_retry_interval_ms Work with"); + pw.println(" --client_max_retries to"); + pw.println(" create a retry client. The"); + pw.println(" default value is 1000."); + pw.println(" -containerId ContainerId. By default, it"); + pw.println(" will print all available"); + pw.println(" logs. Work with -log_files"); + pw.println(" to get only specific logs."); + pw.println(" If specified, the"); + pw.println(" applicationId can be omitted"); + pw.println(" -help Displays help for all"); + pw.println(" commands."); + pw.println(" -list_nodes Show the list of nodes that"); + pw.println(" successfully aggregated"); + pw.println(" logs. This option can only"); + pw.println(" be used with finished"); + pw.println(" applications."); + pw.println(" -log_files Specify comma-separated"); + pw.println(" value to get exact matched"); + pw.println(" log files. Use \"ALL\" or \"*\""); + pw.println(" to fetch all the log files"); + pw.println(" for the container."); + pw.println(" -log_files_pattern Specify comma-separated"); + pw.println(" value to get matched log"); + pw.println(" files by using java regex."); + pw.println(" Use \".*\" to fetch all the"); + pw.println(" log files for the container."); + pw.println(" -nodeAddress NodeAddress in the format"); + pw.println(" nodename:port"); + pw.println(" -out Local directory for storing"); + pw.println(" individual container logs."); + pw.println(" The container logs will be"); + pw.println(" stored based on the node the"); + pw.println(" container ran on."); + pw.println(" -show_application_log_info Show the containerIds which"); + pw.println(" belong to the specific"); + pw.println(" Application. You can combine"); + pw.println(" this with --nodeAddress to"); + pw.println(" get containerIds for all the"); + pw.println(" containers on the specific"); + pw.println(" NodeManager."); + pw.println(" -show_container_log_info Show the container log"); + pw.println(" metadata, including log-file"); + pw.println(" names, the size of the log"); + pw.println(" files. You can combine this"); + pw.println(" with --containerId to get"); + pw.println(" log metadata for the"); + pw.println(" specific container, or with"); + pw.println(" --nodeAddress to get log"); + pw.println(" metadata for all the"); + pw.println(" containers on the specific"); + pw.println(" NodeManager."); + pw.println(" -size Prints the log file's first"); + pw.println(" 'n' bytes or the last 'n'"); + pw.println(" bytes. Use negative values"); + pw.println(" as bytes to read from the"); + pw.println(" end and positive values as"); + pw.println(" bytes to read from the"); + pw.println(" beginning."); pw.close(); String appReportStr = baos.toString("UTF-8"); - Assert.assertEquals(appReportStr, sysOutStream.toString()); + Assert.assertTrue(sysOutStream.toString().contains(appReportStr)); } @Test (timeout = 15000) @@ -608,6 +631,56 @@ public class TestLogsCLI { fs.delete(new Path(rootLogDir), true); } + @Test + public void testCheckRetryCount() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + NodeId nodeId = NodeId.newInstance("localhost", 1234); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); + + // Create a mock ApplicationAttempt Report + ApplicationAttemptReport mockAttemptReport = mock( + ApplicationAttemptReport.class); + doReturn(appAttemptId).when(mockAttemptReport).getApplicationAttemptId(); + List attemptReports = Arrays.asList( + mockAttemptReport); + + // Create one mock containerReport + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerReport mockContainerReport1 = mock(ContainerReport.class); + doReturn(containerId1).when(mockContainerReport1).getContainerId(); + doReturn(nodeId).when(mockContainerReport1).getAssignedNode(); + doReturn("http://localhost:2345").when(mockContainerReport1) + .getNodeHttpAddress(); + doReturn(ContainerState.RUNNING).when(mockContainerReport1) + .getContainerState(); + List containerReports = Arrays.asList( + mockContainerReport1); + // Mock the YarnClient, and it would report the previous created + // mockAttemptReport and previous two created mockContainerReports + YarnClient mockYarnClient = createMockYarnClient( + YarnApplicationState.RUNNING, ugi.getShortUserName(), true, + attemptReports, containerReports); + doReturn(mockContainerReport1).when(mockYarnClient).getContainerReport( + any(ContainerId.class)); + LogsCLI cli = new LogsCLIForTest(mockYarnClient); + cli.setConf(new YarnConfiguration()); + try { + cli.run(new String[] {"-containerId", + containerId1.toString(), "-client_max_retries", "5"}); + Assert.fail("Exception expected! " + + "NodeManager should be off to run this test. "); + } catch (RuntimeException ce) { + Assert.assertTrue( + "Handler exception for reason other than retry: " + ce.getMessage(), + ce.getMessage().contains("Connection retries limit exceeded")); + Assert.assertTrue("Retry filter didn't perform any retries! ", cli + .connectionRetry.getRetired()); + } + } + @Test (timeout = 5000) public void testGetRunningContainerLogs() throws Exception { UserGroupInformation ugi = UserGroupInformation.getCurrentUser();