From ee005e010cff3f97a5daa8000ac2cd151e2631ca Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 28 Jan 2016 17:43:17 -0800 Subject: [PATCH] HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha) --- .../hadoop-common/CHANGES.txt | 2 + .../metrics2/sink/RollingFileSystemSink.java | 420 +++++++++++++++ .../sink/RollingFileSystemSinkTestBase.java | 506 ++++++++++++++++++ .../sink/TestRollingFileSystemSink.java | 156 ++++++ 4 files changed, 1084 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4da20e0d4f2..4d01857ffee 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -650,6 +650,8 @@ Release 2.9.0 - UNRELEASED NEW FEATURES + HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha) + IMPROVEMENTS HADOOP-12321. Make JvmPauseMonitor an AbstractService. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java new file mode 100644 index 00000000000..8271362e50d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.TimeZone; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; + +/** + * This class is a metrics sink that uses + * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every + * hour a new directory will be created under the path specified by the + * basepath property. All metrics will be logged to a file in the + * current hour's directory in a file named <hostname>.log, where + * <hostname> is the name of the host on which the metrics logging + * process is running. The base path is set by the + * <prefix>.sink.<instance>.basepath property. The + * time zone used to create the current hour's directory name is GMT. If the + * basepath property isn't specified, it will default to + * "/tmp", which is the temp directory on whatever default file + * system is configured for the cluster. + * + * The <prefix>.sink.<instance>.ignore-error property + * controls whether an exception is thrown when an error is encountered writing + * a log file. The default value is true. When set to + * false, file errors are quietly swallowed. + * + * The primary use of this class is for logging to HDFS. As it uses + * {@link org.apache.hadoop.fs.FileSystem} to access the target file system, + * however, it can be used to write to the local file system, Amazon S3, or any + * other supported file system. The base path for the sink will determine the + * file system used. An unqualified path will write to the default file system + * set by the configuration. + * + * Not all file systems support the ability to append to files. In file systems + * without the ability to append to files, only one writer can write to a file + * at a time. To allow for concurrent writes from multiple daemons on a single + * host, the source property should be set to the name of the + * source daemon, e.g. namenode. The value of the source + * property should typically be the same as the property's prefix. If this + * property is not set, the source is taken to be unknown. + * + * Instead of appending to an existing file, by default the sink + * will create a new file with a suffix of ".<n>&quet;, where + * n is the next lowest integer that isn't already used in a file name, + * similar to the Hadoop daemon logs. NOTE: the file with the highest + * sequence number is the newest file, unlike the Hadoop daemon logs. + * + * For file systems that allow append, the sink supports appending to the + * existing file instead. If the allow-append property is set to + * true, the sink will instead append to the existing file on file systems that + * support appends. By default, the allow-append property is + * false. + * + * Note that when writing to HDFS with allow-append set to true, + * there is a minimum acceptable number of data nodes. If the number of data + * nodes drops below that minimum, the append will succeed, but reading the + * data will fail with an IOException in the DataStreamer class. The minimum + * number of data nodes required for a successful append is generally 2 or 3. + * + * Note also that when writing to HDFS, the file size information is not updated + * until the file is closed (e.g. at the top of the hour) even though the data + * is being written successfully. This is a known HDFS limitation that exists + * because of the performance cost of updating the metadata. See + * HDFS-5478. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RollingFileSystemSink implements MetricsSink, Closeable { + private static final String BASEPATH_KEY = "basepath"; + private static final String SOURCE_KEY = "source"; + private static final String IGNORE_ERROR_KEY = "ignore-error"; + private static final String ALLOW_APPEND_KEY = "allow-append"; + private static final String SOURCE_DEFAULT = "unknown"; + private static final String BASEPATH_DEFAULT = "/tmp"; + private static final FastDateFormat DATE_FORMAT = + FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT")); + private String source; + private boolean ignoreError; + private boolean allowAppend; + private Path basePath; + private FileSystem fileSystem; + // The current directory path into which we're writing files + private Path currentDirPath; + // The path to the current file into which we're writing data + private Path currentFilePath; + // The stream to which we're currently writing. + private PrintStream currentOutStream; + // We keep this only to be able to call hsynch() on it. + private FSDataOutputStream currentFSOutStream; + + @Override + public void init(SubsetConfiguration conf) { + basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); + source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT); + ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false); + allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false); + + try { + fileSystem = FileSystem.get(new URI(basePath.toString()), + new Configuration()); + } catch (URISyntaxException ex) { + throw new MetricsException("The supplied filesystem base path URI" + + " is not a valid URI: " + basePath.toString(), ex); + } catch (IOException ex) { + throw new MetricsException("Error connecting to file system: " + + basePath + " [" + ex.toString() + "]", ex); + } + + // If we're permitted to append, check if we actually can + if (allowAppend) { + allowAppend = checkAppend(fileSystem); + } + } + + /** + * Test whether the file system supports append and return the answer. + * @param fs the target file system + */ + private boolean checkAppend(FileSystem fs) { + boolean canAppend = true; + + try { + fs.append(basePath); + } catch (IOException ex) { + if (ex.getMessage().equals("Not supported")) { + canAppend = false; + } + } + + return canAppend; + } + + /** + * Check the current directory against the time stamp. If they're not + * the same, create a new directory and a new log file in that directory. + * + * @throws MetricsException thrown if an error occurs while creating the + * new directory or new log file + */ + private void rollLogDirIfNeeded() throws MetricsException { + String currentDir = DATE_FORMAT.format(new Date()); + Path path = new Path(basePath, currentDir); + + // We check whether currentOutStream is null instead of currentDirPath, + // because if currentDirPath is null, then currentOutStream is null, but + // currentOutStream can be null for other reasons. + if ((currentOutStream == null) || !path.equals(currentDirPath)) { + currentDirPath = path; + + if (currentOutStream != null) { + currentOutStream.close(); + } + + try { + rollLogDir(); + } catch (IOException ex) { + throwMetricsException("Failed to creating new log file", ex); + } + } + } + + /** + * Create a new directory based on the current hour and a new log file in + * that directory. + * + * @throws IOException thrown if an error occurs while creating the + * new directory or new log file + */ + private void rollLogDir() throws IOException { + String fileName = + source + "-" + InetAddress.getLocalHost().getHostName() + ".log"; + + Path targetFile = new Path(currentDirPath, fileName); + fileSystem.mkdirs(currentDirPath); + + if (allowAppend) { + createOrAppendLogFile(targetFile); + } else { + createLogFile(targetFile); + } + } + + /** + * Create a new log file and return the {@link FSDataOutputStream}. If a + * file with the specified path already exists, add a suffix, starting with 1 + * and try again. Keep incrementing the suffix until a nonexistent target + * path is found. + * + * Once the file is open, update {@link #currentFSOutStream}, + * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. + * + * @param initial the target path + * @throws IOException thrown if the call to see if the exists fails + */ + private void createLogFile(Path initial) throws IOException { + Path currentAttempt = initial; + int id = 1; + + while (true) { + // First try blindly creating the file. If we fail, it either means + // the file exists, or the operation actually failed. We do it this way + // because if we check whether the file exists, it might still be created + // by the time we try to create it. Creating first works like a + // test-and-set. + try { + currentFSOutStream = fileSystem.create(currentAttempt, false); + currentOutStream = new PrintStream(currentFSOutStream, true, + StandardCharsets.UTF_8.name()); + currentFilePath = currentAttempt; + break; + } catch (IOException ex) { + // Now we can check to see if the file exists to know why we failed + if (fileSystem.exists(currentAttempt)) { + currentAttempt = new Path(initial.toString() + "." + id); + id += 1; + } else { + throw ex; + } + } + } + } + + /** + * Create a new log file and return the {@link FSDataOutputStream}. If a + * file with the specified path already exists, open the file for append + * instead. + * + * Once the file is open, update {@link #currentFSOutStream}, + * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. + * + * @param initial the target path + * @throws IOException thrown if the call to see the append operation fails. + */ + private void createOrAppendLogFile(Path targetFile) throws IOException { + // First try blindly creating the file. If we fail, it either means + // the file exists, or the operation actually failed. We do it this way + // because if we check whether the file exists, it might still be created + // by the time we try to create it. Creating first works like a + // test-and-set. + try { + currentFSOutStream = fileSystem.create(targetFile, false); + currentOutStream = new PrintStream(currentFSOutStream, true, + StandardCharsets.UTF_8.name()); + } catch (IOException ex) { + // Try appending instead. If we fail, if means the file doesn't + // actually exist yet or the operation actually failed. + try { + currentFSOutStream = fileSystem.append(targetFile); + currentOutStream = new PrintStream(currentFSOutStream, true, + StandardCharsets.UTF_8.name()); + } catch (IOException ex2) { + // If the original create failed for a legit but transitory + // reason, the append will fail because the file now doesn't exist, + // resulting in a confusing stack trace. To avoid that, we set + // the cause of the second exception to be the first exception. + // It's still a tiny bit confusing, but it's enough + // information that someone should be able to figure it out. + ex2.initCause(ex); + + throw ex2; + } + } + + currentFilePath = targetFile; + } + + @Override + public void putMetrics(MetricsRecord record) { + rollLogDirIfNeeded(); + + if (currentOutStream != null) { + currentOutStream.printf("%d %s.%s", record.timestamp(), + record.context(), record.name()); + + String separator = ": "; + + for (MetricsTag tag : record.tags()) { + currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value()); + separator = ", "; + } + + for (AbstractMetric metric : record.metrics()) { + currentOutStream.printf("%s%s=%s", separator, metric.name(), + metric.value()); + } + + currentOutStream.println(); + + // If we don't hflush(), the data may not be written until the file is + // closed. The file won't be closed until the top of the hour *AND* + // another record is received. Calling hflush() makes sure that the data + // is complete at the top of the hour. + try { + currentFSOutStream.hflush(); + } catch (IOException ex) { + throwMetricsException("Failed flushing the stream", ex); + } + + checkForErrors("Unable to write to log file"); + } else if (!ignoreError) { + throwMetricsException("Unable to write to log file"); + } + } + + @Override + public void flush() { + // currentOutStream is null if currentFSOutStream is null + if (currentFSOutStream != null) { + try { + currentFSOutStream.hflush(); + } catch (IOException ex) { + throwMetricsException("Unable to flush log file", ex); + } + } + } + + @Override + public void close() throws IOException { + if (currentOutStream != null) { + currentOutStream.close(); + + try { + checkForErrors("Unable to close log file"); + } finally { + // Null out the streams just in case someone tries to reuse us. + currentOutStream = null; + currentFSOutStream = null; + } + } + } + + /** + * If the sink isn't set to ignore errors, throw a {@link MetricsException} + * if the stream encountered an exception. The message parameter will be used + * as the new exception's message with the current file name + * ({@link #currentFilePath}) appended to it. + * + * @param message the exception message. The message will have the current + * file name ({@link #currentFilePath}) appended to it. + * @throws MetricsException thrown if there was an error and the sink isn't + * ignoring errors + */ + private void checkForErrors(String message) + throws MetricsException { + if (!ignoreError && currentOutStream.checkError()) { + throw new MetricsException(message + ": " + currentFilePath); + } + } + + /** + * If the sink isn't set to ignore errors, wrap the Throwable in a + * {@link MetricsException} and throw it. The message parameter will be used + * as the new exception's message with the current file name + * ({@link #currentFilePath}) and the Throwable's string representation + * appended to it. + * + * @param message the exception message. The message will have the current + * file name ({@link #currentFilePath}) and the Throwable's string + * representation appended to it. + * @param t the Throwable to wrap + */ + private void throwMetricsException(String message, Throwable t) { + if (!ignoreError) { + throw new MetricsException(message + ": " + currentFilePath + " [" + + t.toString() + "]", t); + } + } + + /** + * If the sink isn't set to ignore errors, throw a new + * {@link MetricsException}. The message parameter will be used as the + * new exception's message with the current file name + * ({@link #currentFilePath}) appended to it. + * + * @param message the exception message. The message will have the current + * file name ({@link #currentFilePath}) appended to it. + */ + private void throwMetricsException(String message) { + if (!ignoreError) { + throw new MetricsException(message + ": " + currentFilePath); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java new file mode 100644 index 00000000000..32132761e57 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; +import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.annotation.Metric.Type; +import org.apache.hadoop.metrics2.impl.ConfigBuilder; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.impl.TestMetricsConfig; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.junit.Before; +import org.junit.BeforeClass; +import static org.junit.Assert.assertTrue; + +/** + * This class is a base class for testing the {@link RollingFileSystemSink} + * class in various contexts. It provides the a number of useful utility + * methods for classes that extend it. + */ +public class RollingFileSystemSinkTestBase { + protected static final File ROOT_TEST_DIR = + new File(System.getProperty("test.build.data", "target/"), + "FileSystemSinkTest"); + protected static final SimpleDateFormat DATE_FORMAT = + new SimpleDateFormat("yyyyMMddHH"); + protected static File methodDir; + + /** + * The name of the current test method. + */ + @Rule + public TestName methodName = new TestName(); + + /** + * A sample metric class + */ + @Metrics(name="testRecord1", context="test1") + protected class MyMetrics1 { + @Metric(value={"testTag1", ""}, type=Type.TAG) + String testTag1() { return "testTagValue1"; } + + @Metric(value={"testTag2", ""}, type=Type.TAG) + String gettestTag2() { return "testTagValue2"; } + + @Metric(value={"testMetric1", "An integer gauge"}, always=true) + MutableGaugeInt testMetric1; + + @Metric(value={"testMetric2", "A long gauge"}, always=true) + MutableGaugeLong testMetric2; + + public MyMetrics1 registerWith(MetricsSystem ms) { + return ms.register(methodName.getMethodName() + "-m1", null, this); + } + } + + /** + * Another sample metrics class + */ + @Metrics(name="testRecord2", context="test1") + protected class MyMetrics2 { + @Metric(value={"testTag22", ""}, type=Type.TAG) + String testTag1() { return "testTagValue22"; } + + public MyMetrics2 registerWith(MetricsSystem ms) { + return ms.register(methodName.getMethodName() + "-m2", null, this); + } + } + + /** + * Set the date format's timezone to GMT. + */ + @BeforeClass + public static void setTZ() { + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT")); + } + + /** + * Delete the test directory for this test. + * @throws IOException thrown if the delete fails + */ + @AfterClass + public static void deleteBaseDir() throws IOException { + FileUtils.deleteDirectory(ROOT_TEST_DIR); + } + + /** + * Create the test directory for this test. + * @throws IOException thrown if the create fails + */ + @Before + public void createMethodDir() throws IOException { + methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName()); + + methodDir.mkdirs(); + } + + /** + * Set up the metrics system, start it, and return it. + * @param path the base path for the sink + * @param ignoreErrors whether the sink should ignore errors + * @param allowAppend whether the sink is allowed to append to existing files + * @return the metrics system + */ + protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors, + boolean allowAppend) { + // If the prefix is not lower case, the metrics system won't be able to + // read any of the properties. + final String prefix = methodName.getMethodName().toLowerCase(); + + new ConfigBuilder().add("*.period", 10000) + .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName()) + .add(prefix + ".sink.mysink0.basepath", path) + .add(prefix + ".sink.mysink0.source", "testsrc") + .add(prefix + ".sink.mysink0.context", "test1") + .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors) + .add(prefix + ".sink.mysink0.allow-append", allowAppend) + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix)); + + MetricsSystemImpl ms = new MetricsSystemImpl(prefix); + + ms.start(); + + return ms; + } + + /** + * Helper method that writes metrics files to a target path, reads those + * files, and returns the contents of all files as a single string. This + * method will assert that the correct number of files is found. + * + * @param ms an initialized MetricsSystem to use + * @param path the target path from which to read the logs + * @param count the number of log files to expect + * @return the contents of the log files + * @throws IOException when the log file can't be read + * @throws URISyntaxException when the target path is an invalid URL + */ + protected String doWriteTest(MetricsSystem ms, String path, int count) + throws IOException, URISyntaxException { + final String then = DATE_FORMAT.format(new Date()); + + MyMetrics1 mm1 = new MyMetrics1().registerWith(ms); + new MyMetrics2().registerWith(ms); + + mm1.testMetric1.incr(); + mm1.testMetric2.incr(2); + + ms.publishMetricsNow(); // publish the metrics + ms.stop(); + ms.shutdown(); + + return readLogFile(path, then, count); + } + + /** + * Read the log files at the target path and return the contents as a single + * string. This method will assert that the correct number of files is found. + * + * @param path the target path + * @param then when the test method began. Used to find the log directory in + * the case that the test run crosses the top of the hour. + * @param count the number of log files to expect + * @return + * @throws IOException + * @throws URISyntaxException + */ + protected String readLogFile(String path, String then, int count) + throws IOException, URISyntaxException { + final String now = DATE_FORMAT.format(new Date()); + final String logFile = + "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log"; + FileSystem fs = FileSystem.get(new URI(path), new Configuration()); + StringBuilder metrics = new StringBuilder(); + boolean found = false; + + for (FileStatus status : fs.listStatus(new Path(path))) { + Path logDir = status.getPath(); + + // There are only two possible valid log directory names: the time when + // the test started and the current time. Anything else can be ignored. + if (now.equals(logDir.getName()) || then.equals(logDir.getName())) { + readLogData(fs, findMostRecentLogFile(fs, new Path(logDir, logFile)), + metrics); + assertFileCount(fs, logDir, count); + found = true; + } + } + + assertTrue("No valid log directories found", found); + + return metrics.toString(); + } + + /** + * Read the target log file and append its contents to the StringBuilder. + * @param fs the target FileSystem + * @param logFile the target file path + * @param metrics where to append the file contents + * @throws IOException thrown if the file cannot be read + */ + protected void readLogData(FileSystem fs, Path logFile, StringBuilder metrics) + throws IOException { + FSDataInputStream fsin = fs.open(logFile); + BufferedReader in = new BufferedReader(new InputStreamReader(fsin, + StandardCharsets.UTF_8)); + String line = null; + + while ((line = in.readLine()) != null) { + metrics.append(line).append("\n"); + } + } + + /** + * Return the path to the log file to use, based on the target path. + * @param fs the target FileSystem + * @param initial the path from which to start + * @return the path to use + * @throws IOException thrown if testing for file existence fails. + */ + protected Path findMostRecentLogFile(FileSystem fs, Path initial) + throws IOException { + Path logFile = null; + Path nextLogFile = initial; + int id = 1; + + do { + logFile = nextLogFile; + nextLogFile = new Path(initial.toString() + "." + id); + id += 1; + } while (fs.exists(nextLogFile)); + return logFile; + } + + /** + * Assert that the given contents match what is expected from the test + * metrics. + * + * @param contents the file contents to test + */ + protected void assertMetricsContents(String contents) { + // Note that in the below expression we allow tags and metrics to go in + // arbitrary order, but the records must be in order. + final Pattern expectedContentPattern = Pattern.compile( + "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+" + + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|" + + "testTag2=testTagValue2,\\s+testTag1=testTagValue1)," + + "\\s+Hostname=.*,\\s+" + + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)" + + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1," + + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*", + Pattern.MULTILINE); + + assertTrue("Sink did not produce the expected output. Actual output was: " + + contents, expectedContentPattern.matcher(contents).matches()); + } + + /** + * Assert that the given contents match what is expected from the test + * metrics when there is pre-existing data. + * + * @param contents the file contents to test + */ + protected void assertExtraContents(String contents) { + // Note that in the below expression we allow tags and metrics to go in + // arbitrary order, but the records must be in order. + final Pattern expectedContentPattern = Pattern.compile( + "Extra stuff[\\n\\r]*" + + "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+" + + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|" + + "testTag2=testTagValue2,\\s+testTag1=testTagValue1)," + + "\\s+Hostname=.*,\\s+" + + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)" + + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1," + + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*", + Pattern.MULTILINE); + + assertTrue("Sink did not produce the expected output. Actual output was: " + + contents, expectedContentPattern.matcher(contents).matches()); + } + + /** + * Call {@link #doWriteTest} after pre-creating the log file and filling it + * with junk data. + * + * @param path the base path for the test + * @param ignoreErrors whether to ignore errors + * @param allowAppend whether to allow appends + * @param count the number of files to expect + * @return the contents of the final log file + * @throws IOException if a file system operation fails + * @throws InterruptedException if interrupted while calling + * {@link #getNowNotTopOfHour()} + * @throws URISyntaxException if the path is not a valid URI + */ + protected String doAppendTest(String path, boolean ignoreErrors, + boolean allowAppend, int count) + throws IOException, InterruptedException, URISyntaxException { + preCreateLogFile(path); + + return doWriteTest(initMetricsSystem(path, ignoreErrors, allowAppend), + path, count); + } + + /** + * Create a file at the target path with some known data in it: + * "Extra stuff". + * + * If the test run is happening within 20 seconds of the top of the hour, + * this method will sleep until the top of the hour. + * + * @param path the target path under which to create the directory for the + * current hour that will contain the log file. + * + * @throws IOException thrown if the file creation fails + * @throws InterruptedException thrown if interrupted while waiting for the + * top of the hour. + * @throws URISyntaxException thrown if the path isn't a valid URI + */ + protected void preCreateLogFile(String path) + throws IOException, InterruptedException, URISyntaxException { + preCreateLogFile(path, 1); + } + + /** + * Create files at the target path with some known data in them. Each file + * will have the same content: "Extra stuff". + * + * If the test run is happening within 20 seconds of the top of the hour, + * this method will sleep until the top of the hour. + * + * @param path the target path under which to create the directory for the + * current hour that will contain the log files. + * @param numFiles the number of log files to create + * @throws IOException thrown if the file creation fails + * @throws InterruptedException thrown if interrupted while waiting for the + * top of the hour. + * @throws URISyntaxException thrown if the path isn't a valid URI + */ + protected void preCreateLogFile(String path, int numFiles) + throws IOException, InterruptedException, URISyntaxException { + Calendar now = getNowNotTopOfHour(); + + FileSystem fs = FileSystem.get(new URI(path), new Configuration()); + Path dir = new Path(path, DATE_FORMAT.format(now.getTime())); + + fs.mkdirs(dir); + + Path file = new Path(dir, + "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log"); + + // Create the log file to force the sink to append + try (FSDataOutputStream out = fs.create(file)) { + out.write("Extra stuff\n".getBytes()); + out.flush(); + } + + if (numFiles > 1) { + int count = 1; + + while (count < numFiles) { + file = new Path(dir, "testsrc-" + + InetAddress.getLocalHost().getHostName() + ".log." + count); + + // Create the log file to force the sink to append + try (FSDataOutputStream out = fs.create(file)) { + out.write("Extra stuff\n".getBytes()); + out.flush(); + } + + count += 1; + } + } + } + + /** + * Return a calendar based on the current time. If the current time is very + * near the top of the hour (less than 20 seconds), sleep until the new hour + * before returning a new Calendar instance. + * + * @return a new Calendar instance that isn't near the top of the hour + * @throws InterruptedException if interrupted while sleeping + */ + public Calendar getNowNotTopOfHour() throws InterruptedException { + Calendar now = Calendar.getInstance(TimeZone.getTimeZone("GMT")); + + // If we're at the very top of the hour, sleep until the next hour + // so that we don't get confused by the directory rolling + if ((now.get(Calendar.MINUTE) == 59) && (now.get(Calendar.SECOND) > 40)) { + Thread.sleep((61 - now.get(Calendar.SECOND)) * 1000L); + now.setTime(new Date()); + } + + return now; + } + + /** + * Assert that the number of log files in the target directory is as expected. + * @param fs the target FileSystem + * @param dir the target directory path + * @param expected the expected number of files + * @throws IOException thrown if listing files fails + */ + public void assertFileCount(FileSystem fs, Path dir, int expected) + throws IOException { + RemoteIterator i = fs.listFiles(dir, true); + int count = 0; + + while (i.hasNext()) { + i.next(); + count++; + } + + assertTrue("The sink created additional unexpected log files. " + count + + "files were created", expected >= count); + assertTrue("The sink created too few log files. " + count + "files were " + + "created", expected <= count); + } + + /** + * This class is a {@link RollingFileSystemSink} wrapper that tracks whether + * an exception has been thrown during operations. + */ + public static class ErrorSink extends RollingFileSystemSink { + public static volatile boolean errored = false; + + @Override + public void putMetrics(MetricsRecord record) { + try { + super.putMetrics(record); + } catch (MetricsException ex) { + errored = true; + + throw new MetricsException(ex); + } + } + + @Override + public void close() throws IOException { + try { + super.close(); + } catch (MetricsException ex) { + errored = true; + + throw new MetricsException(ex); + } + } + + @Override + public void flush() { + try { + super.flush(); + } catch (MetricsException ex) { + errored = true; + + throw new MetricsException(ex); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java new file mode 100644 index 00000000000..da632351be2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import org.apache.hadoop.metrics2.MetricsSystem; + +import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test the {@link RollingFileSystemSink} class in the context of the local file + * system. + */ +public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase { + /** + * Test writing logs to the local file system. + * @throws Exception when things break + */ + @Test + public void testWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, false, false); + + assertMetricsContents(doWriteTest(ms, path, 1)); + } + + /** + * Test writing logs to the local file system with the sink set to ignore + * errors. + * @throws Exception when things break + */ + @Test + public void testSilentWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, true, false); + + assertMetricsContents(doWriteTest(ms, path, 1)); + } + + /** + * Test writing logs to HDFS when the log file already exists. + * + * @throws Exception when things break + */ + @Test + public void testExistingWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + + assertMetricsContents(doAppendTest(path, false, false, 2)); + } + + /** + * Test writing logs to HDFS when the log file and the .1 log file already + * exist. + * + * @throws Exception when things break + */ + @Test + public void testExistingWrite2() throws Exception { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, false, false); + + preCreateLogFile(path, 2); + + assertMetricsContents(doWriteTest(ms, path, 3)); + } + + /** + * Test writing logs to HDFS with ignore errors enabled when + * the log file already exists. + * + * @throws Exception when things break + */ + @Test + public void testSilentExistingWrite() throws Exception { + String path = methodDir.getAbsolutePath(); + + assertMetricsContents(doAppendTest(path, false, false, 2)); + } + + /** + * Test that writing fails when the directory isn't writable. + */ + @Test + public void testFailedWrite() { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, false, false); + + new MyMetrics1().registerWith(ms); + + methodDir.setWritable(false); + ErrorSink.errored = false; + + try { + // publish the metrics + ms.publishMetricsNow(); + + assertTrue("No exception was generated while writing metrics " + + "even though the target directory was not writable", + ErrorSink.errored); + + ms.stop(); + ms.shutdown(); + } finally { + // Make sure the dir is writable again so we can delete it at the end + methodDir.setWritable(true); + } + } + + /** + * Test that writing fails silently when the directory is not writable. + */ + @Test + public void testSilentFailedWrite() { + String path = methodDir.getAbsolutePath(); + MetricsSystem ms = initMetricsSystem(path, true, false); + + new MyMetrics1().registerWith(ms); + + methodDir.setWritable(false); + ErrorSink.errored = false; + + try { + // publish the metrics + ms.publishMetricsNow(); + + assertFalse("An exception was generated while writing metrics " + + "when the target directory was not writable, even though the " + + "sink is set to ignore errors", + ErrorSink.errored); + + ms.stop(); + ms.shutdown(); + } finally { + // Make sure the dir is writable again so we can delete it at the end + methodDir.setWritable(true); + } + } +}