diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 4da20e0d4f2..4d01857ffee 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -650,6 +650,8 @@ Release 2.9.0 - UNRELEASED
NEW FEATURES
+ HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha)
+
IMPROVEMENTS
HADOOP-12321. Make JvmPauseMonitor an AbstractService.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
new file mode 100644
index 00000000000..8271362e50d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+/**
+ * This class is a metrics sink that uses
+ * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every
+ * hour a new directory will be created under the path specified by the
+ * basepath
property. All metrics will be logged to a file in the
+ * current hour's directory in a file named <hostname>.log, where
+ * <hostname> is the name of the host on which the metrics logging
+ * process is running. The base path is set by the
+ * <prefix>.sink.<instance>.basepath
property. The
+ * time zone used to create the current hour's directory name is GMT. If the
+ * basepath
property isn't specified, it will default to
+ * "/tmp", which is the temp directory on whatever default file
+ * system is configured for the cluster.
+ *
+ * The <prefix>.sink.<instance>.ignore-error
property
+ * controls whether an exception is thrown when an error is encountered writing
+ * a log file. The default value is true
. When set to
+ * false
, file errors are quietly swallowed.
+ *
+ * The primary use of this class is for logging to HDFS. As it uses
+ * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
+ * however, it can be used to write to the local file system, Amazon S3, or any
+ * other supported file system. The base path for the sink will determine the
+ * file system used. An unqualified path will write to the default file system
+ * set by the configuration.
+ *
+ * Not all file systems support the ability to append to files. In file systems
+ * without the ability to append to files, only one writer can write to a file
+ * at a time. To allow for concurrent writes from multiple daemons on a single
+ * host, the source
property should be set to the name of the
+ * source daemon, e.g. namenode. The value of the source
+ * property should typically be the same as the property's prefix. If this
+ * property is not set, the source is taken to be unknown.
+ *
+ * Instead of appending to an existing file, by default the sink
+ * will create a new file with a suffix of ".<n>&quet;, where
+ * n is the next lowest integer that isn't already used in a file name,
+ * similar to the Hadoop daemon logs. NOTE: the file with the highest
+ * sequence number is the newest file, unlike the Hadoop daemon logs.
+ *
+ * For file systems that allow append, the sink supports appending to the
+ * existing file instead. If the allow-append
property is set to
+ * true, the sink will instead append to the existing file on file systems that
+ * support appends. By default, the allow-append
property is
+ * false.
+ *
+ * Note that when writing to HDFS with allow-append
set to true,
+ * there is a minimum acceptable number of data nodes. If the number of data
+ * nodes drops below that minimum, the append will succeed, but reading the
+ * data will fail with an IOException in the DataStreamer class. The minimum
+ * number of data nodes required for a successful append is generally 2 or 3.
+ *
+ * Note also that when writing to HDFS, the file size information is not updated
+ * until the file is closed (e.g. at the top of the hour) even though the data
+ * is being written successfully. This is a known HDFS limitation that exists
+ * because of the performance cost of updating the metadata. See
+ * HDFS-5478.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RollingFileSystemSink implements MetricsSink, Closeable {
+ private static final String BASEPATH_KEY = "basepath";
+ private static final String SOURCE_KEY = "source";
+ private static final String IGNORE_ERROR_KEY = "ignore-error";
+ private static final String ALLOW_APPEND_KEY = "allow-append";
+ private static final String SOURCE_DEFAULT = "unknown";
+ private static final String BASEPATH_DEFAULT = "/tmp";
+ private static final FastDateFormat DATE_FORMAT =
+ FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
+ private String source;
+ private boolean ignoreError;
+ private boolean allowAppend;
+ private Path basePath;
+ private FileSystem fileSystem;
+ // The current directory path into which we're writing files
+ private Path currentDirPath;
+ // The path to the current file into which we're writing data
+ private Path currentFilePath;
+ // The stream to which we're currently writing.
+ private PrintStream currentOutStream;
+ // We keep this only to be able to call hsynch() on it.
+ private FSDataOutputStream currentFSOutStream;
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
+ source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
+ ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
+ allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
+
+ try {
+ fileSystem = FileSystem.get(new URI(basePath.toString()),
+ new Configuration());
+ } catch (URISyntaxException ex) {
+ throw new MetricsException("The supplied filesystem base path URI"
+ + " is not a valid URI: " + basePath.toString(), ex);
+ } catch (IOException ex) {
+ throw new MetricsException("Error connecting to file system: "
+ + basePath + " [" + ex.toString() + "]", ex);
+ }
+
+ // If we're permitted to append, check if we actually can
+ if (allowAppend) {
+ allowAppend = checkAppend(fileSystem);
+ }
+ }
+
+ /**
+ * Test whether the file system supports append and return the answer.
+ * @param fs the target file system
+ */
+ private boolean checkAppend(FileSystem fs) {
+ boolean canAppend = true;
+
+ try {
+ fs.append(basePath);
+ } catch (IOException ex) {
+ if (ex.getMessage().equals("Not supported")) {
+ canAppend = false;
+ }
+ }
+
+ return canAppend;
+ }
+
+ /**
+ * Check the current directory against the time stamp. If they're not
+ * the same, create a new directory and a new log file in that directory.
+ *
+ * @throws MetricsException thrown if an error occurs while creating the
+ * new directory or new log file
+ */
+ private void rollLogDirIfNeeded() throws MetricsException {
+ String currentDir = DATE_FORMAT.format(new Date());
+ Path path = new Path(basePath, currentDir);
+
+ // We check whether currentOutStream is null instead of currentDirPath,
+ // because if currentDirPath is null, then currentOutStream is null, but
+ // currentOutStream can be null for other reasons.
+ if ((currentOutStream == null) || !path.equals(currentDirPath)) {
+ currentDirPath = path;
+
+ if (currentOutStream != null) {
+ currentOutStream.close();
+ }
+
+ try {
+ rollLogDir();
+ } catch (IOException ex) {
+ throwMetricsException("Failed to creating new log file", ex);
+ }
+ }
+ }
+
+ /**
+ * Create a new directory based on the current hour and a new log file in
+ * that directory.
+ *
+ * @throws IOException thrown if an error occurs while creating the
+ * new directory or new log file
+ */
+ private void rollLogDir() throws IOException {
+ String fileName =
+ source + "-" + InetAddress.getLocalHost().getHostName() + ".log";
+
+ Path targetFile = new Path(currentDirPath, fileName);
+ fileSystem.mkdirs(currentDirPath);
+
+ if (allowAppend) {
+ createOrAppendLogFile(targetFile);
+ } else {
+ createLogFile(targetFile);
+ }
+ }
+
+ /**
+ * Create a new log file and return the {@link FSDataOutputStream}. If a
+ * file with the specified path already exists, add a suffix, starting with 1
+ * and try again. Keep incrementing the suffix until a nonexistent target
+ * path is found.
+ *
+ * Once the file is open, update {@link #currentFSOutStream},
+ * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
+ *
+ * @param initial the target path
+ * @throws IOException thrown if the call to see if the exists fails
+ */
+ private void createLogFile(Path initial) throws IOException {
+ Path currentAttempt = initial;
+ int id = 1;
+
+ while (true) {
+ // First try blindly creating the file. If we fail, it either means
+ // the file exists, or the operation actually failed. We do it this way
+ // because if we check whether the file exists, it might still be created
+ // by the time we try to create it. Creating first works like a
+ // test-and-set.
+ try {
+ currentFSOutStream = fileSystem.create(currentAttempt, false);
+ currentOutStream = new PrintStream(currentFSOutStream, true,
+ StandardCharsets.UTF_8.name());
+ currentFilePath = currentAttempt;
+ break;
+ } catch (IOException ex) {
+ // Now we can check to see if the file exists to know why we failed
+ if (fileSystem.exists(currentAttempt)) {
+ currentAttempt = new Path(initial.toString() + "." + id);
+ id += 1;
+ } else {
+ throw ex;
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new log file and return the {@link FSDataOutputStream}. If a
+ * file with the specified path already exists, open the file for append
+ * instead.
+ *
+ * Once the file is open, update {@link #currentFSOutStream},
+ * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
+ *
+ * @param initial the target path
+ * @throws IOException thrown if the call to see the append operation fails.
+ */
+ private void createOrAppendLogFile(Path targetFile) throws IOException {
+ // First try blindly creating the file. If we fail, it either means
+ // the file exists, or the operation actually failed. We do it this way
+ // because if we check whether the file exists, it might still be created
+ // by the time we try to create it. Creating first works like a
+ // test-and-set.
+ try {
+ currentFSOutStream = fileSystem.create(targetFile, false);
+ currentOutStream = new PrintStream(currentFSOutStream, true,
+ StandardCharsets.UTF_8.name());
+ } catch (IOException ex) {
+ // Try appending instead. If we fail, if means the file doesn't
+ // actually exist yet or the operation actually failed.
+ try {
+ currentFSOutStream = fileSystem.append(targetFile);
+ currentOutStream = new PrintStream(currentFSOutStream, true,
+ StandardCharsets.UTF_8.name());
+ } catch (IOException ex2) {
+ // If the original create failed for a legit but transitory
+ // reason, the append will fail because the file now doesn't exist,
+ // resulting in a confusing stack trace. To avoid that, we set
+ // the cause of the second exception to be the first exception.
+ // It's still a tiny bit confusing, but it's enough
+ // information that someone should be able to figure it out.
+ ex2.initCause(ex);
+
+ throw ex2;
+ }
+ }
+
+ currentFilePath = targetFile;
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ rollLogDirIfNeeded();
+
+ if (currentOutStream != null) {
+ currentOutStream.printf("%d %s.%s", record.timestamp(),
+ record.context(), record.name());
+
+ String separator = ": ";
+
+ for (MetricsTag tag : record.tags()) {
+ currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value());
+ separator = ", ";
+ }
+
+ for (AbstractMetric metric : record.metrics()) {
+ currentOutStream.printf("%s%s=%s", separator, metric.name(),
+ metric.value());
+ }
+
+ currentOutStream.println();
+
+ // If we don't hflush(), the data may not be written until the file is
+ // closed. The file won't be closed until the top of the hour *AND*
+ // another record is received. Calling hflush() makes sure that the data
+ // is complete at the top of the hour.
+ try {
+ currentFSOutStream.hflush();
+ } catch (IOException ex) {
+ throwMetricsException("Failed flushing the stream", ex);
+ }
+
+ checkForErrors("Unable to write to log file");
+ } else if (!ignoreError) {
+ throwMetricsException("Unable to write to log file");
+ }
+ }
+
+ @Override
+ public void flush() {
+ // currentOutStream is null if currentFSOutStream is null
+ if (currentFSOutStream != null) {
+ try {
+ currentFSOutStream.hflush();
+ } catch (IOException ex) {
+ throwMetricsException("Unable to flush log file", ex);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (currentOutStream != null) {
+ currentOutStream.close();
+
+ try {
+ checkForErrors("Unable to close log file");
+ } finally {
+ // Null out the streams just in case someone tries to reuse us.
+ currentOutStream = null;
+ currentFSOutStream = null;
+ }
+ }
+ }
+
+ /**
+ * If the sink isn't set to ignore errors, throw a {@link MetricsException}
+ * if the stream encountered an exception. The message parameter will be used
+ * as the new exception's message with the current file name
+ * ({@link #currentFilePath}) appended to it.
+ *
+ * @param message the exception message. The message will have the current
+ * file name ({@link #currentFilePath}) appended to it.
+ * @throws MetricsException thrown if there was an error and the sink isn't
+ * ignoring errors
+ */
+ private void checkForErrors(String message)
+ throws MetricsException {
+ if (!ignoreError && currentOutStream.checkError()) {
+ throw new MetricsException(message + ": " + currentFilePath);
+ }
+ }
+
+ /**
+ * If the sink isn't set to ignore errors, wrap the Throwable in a
+ * {@link MetricsException} and throw it. The message parameter will be used
+ * as the new exception's message with the current file name
+ * ({@link #currentFilePath}) and the Throwable's string representation
+ * appended to it.
+ *
+ * @param message the exception message. The message will have the current
+ * file name ({@link #currentFilePath}) and the Throwable's string
+ * representation appended to it.
+ * @param t the Throwable to wrap
+ */
+ private void throwMetricsException(String message, Throwable t) {
+ if (!ignoreError) {
+ throw new MetricsException(message + ": " + currentFilePath + " ["
+ + t.toString() + "]", t);
+ }
+ }
+
+ /**
+ * If the sink isn't set to ignore errors, throw a new
+ * {@link MetricsException}. The message parameter will be used as the
+ * new exception's message with the current file name
+ * ({@link #currentFilePath}) appended to it.
+ *
+ * @param message the exception message. The message will have the current
+ * file name ({@link #currentFilePath}) appended to it.
+ */
+ private void throwMetricsException(String message) {
+ if (!ignoreError) {
+ throw new MetricsException(message + ": " + currentFilePath);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
new file mode 100644
index 00000000000..32132761e57
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.annotation.Metric.Type;
+import org.apache.hadoop.metrics2.impl.ConfigBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class is a base class for testing the {@link RollingFileSystemSink}
+ * class in various contexts. It provides the a number of useful utility
+ * methods for classes that extend it.
+ */
+public class RollingFileSystemSinkTestBase {
+ protected static final File ROOT_TEST_DIR =
+ new File(System.getProperty("test.build.data", "target/"),
+ "FileSystemSinkTest");
+ protected static final SimpleDateFormat DATE_FORMAT =
+ new SimpleDateFormat("yyyyMMddHH");
+ protected static File methodDir;
+
+ /**
+ * The name of the current test method.
+ */
+ @Rule
+ public TestName methodName = new TestName();
+
+ /**
+ * A sample metric class
+ */
+ @Metrics(name="testRecord1", context="test1")
+ protected class MyMetrics1 {
+ @Metric(value={"testTag1", ""}, type=Type.TAG)
+ String testTag1() { return "testTagValue1"; }
+
+ @Metric(value={"testTag2", ""}, type=Type.TAG)
+ String gettestTag2() { return "testTagValue2"; }
+
+ @Metric(value={"testMetric1", "An integer gauge"}, always=true)
+ MutableGaugeInt testMetric1;
+
+ @Metric(value={"testMetric2", "A long gauge"}, always=true)
+ MutableGaugeLong testMetric2;
+
+ public MyMetrics1 registerWith(MetricsSystem ms) {
+ return ms.register(methodName.getMethodName() + "-m1", null, this);
+ }
+ }
+
+ /**
+ * Another sample metrics class
+ */
+ @Metrics(name="testRecord2", context="test1")
+ protected class MyMetrics2 {
+ @Metric(value={"testTag22", ""}, type=Type.TAG)
+ String testTag1() { return "testTagValue22"; }
+
+ public MyMetrics2 registerWith(MetricsSystem ms) {
+ return ms.register(methodName.getMethodName() + "-m2", null, this);
+ }
+ }
+
+ /**
+ * Set the date format's timezone to GMT.
+ */
+ @BeforeClass
+ public static void setTZ() {
+ DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
+ }
+
+ /**
+ * Delete the test directory for this test.
+ * @throws IOException thrown if the delete fails
+ */
+ @AfterClass
+ public static void deleteBaseDir() throws IOException {
+ FileUtils.deleteDirectory(ROOT_TEST_DIR);
+ }
+
+ /**
+ * Create the test directory for this test.
+ * @throws IOException thrown if the create fails
+ */
+ @Before
+ public void createMethodDir() throws IOException {
+ methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName());
+
+ methodDir.mkdirs();
+ }
+
+ /**
+ * Set up the metrics system, start it, and return it.
+ * @param path the base path for the sink
+ * @param ignoreErrors whether the sink should ignore errors
+ * @param allowAppend whether the sink is allowed to append to existing files
+ * @return the metrics system
+ */
+ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
+ boolean allowAppend) {
+ // If the prefix is not lower case, the metrics system won't be able to
+ // read any of the properties.
+ final String prefix = methodName.getMethodName().toLowerCase();
+
+ new ConfigBuilder().add("*.period", 10000)
+ .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
+ .add(prefix + ".sink.mysink0.basepath", path)
+ .add(prefix + ".sink.mysink0.source", "testsrc")
+ .add(prefix + ".sink.mysink0.context", "test1")
+ .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
+ .add(prefix + ".sink.mysink0.allow-append", allowAppend)
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
+
+ MetricsSystemImpl ms = new MetricsSystemImpl(prefix);
+
+ ms.start();
+
+ return ms;
+ }
+
+ /**
+ * Helper method that writes metrics files to a target path, reads those
+ * files, and returns the contents of all files as a single string. This
+ * method will assert that the correct number of files is found.
+ *
+ * @param ms an initialized MetricsSystem to use
+ * @param path the target path from which to read the logs
+ * @param count the number of log files to expect
+ * @return the contents of the log files
+ * @throws IOException when the log file can't be read
+ * @throws URISyntaxException when the target path is an invalid URL
+ */
+ protected String doWriteTest(MetricsSystem ms, String path, int count)
+ throws IOException, URISyntaxException {
+ final String then = DATE_FORMAT.format(new Date());
+
+ MyMetrics1 mm1 = new MyMetrics1().registerWith(ms);
+ new MyMetrics2().registerWith(ms);
+
+ mm1.testMetric1.incr();
+ mm1.testMetric2.incr(2);
+
+ ms.publishMetricsNow(); // publish the metrics
+ ms.stop();
+ ms.shutdown();
+
+ return readLogFile(path, then, count);
+ }
+
+ /**
+ * Read the log files at the target path and return the contents as a single
+ * string. This method will assert that the correct number of files is found.
+ *
+ * @param path the target path
+ * @param then when the test method began. Used to find the log directory in
+ * the case that the test run crosses the top of the hour.
+ * @param count the number of log files to expect
+ * @return
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ protected String readLogFile(String path, String then, int count)
+ throws IOException, URISyntaxException {
+ final String now = DATE_FORMAT.format(new Date());
+ final String logFile =
+ "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
+ FileSystem fs = FileSystem.get(new URI(path), new Configuration());
+ StringBuilder metrics = new StringBuilder();
+ boolean found = false;
+
+ for (FileStatus status : fs.listStatus(new Path(path))) {
+ Path logDir = status.getPath();
+
+ // There are only two possible valid log directory names: the time when
+ // the test started and the current time. Anything else can be ignored.
+ if (now.equals(logDir.getName()) || then.equals(logDir.getName())) {
+ readLogData(fs, findMostRecentLogFile(fs, new Path(logDir, logFile)),
+ metrics);
+ assertFileCount(fs, logDir, count);
+ found = true;
+ }
+ }
+
+ assertTrue("No valid log directories found", found);
+
+ return metrics.toString();
+ }
+
+ /**
+ * Read the target log file and append its contents to the StringBuilder.
+ * @param fs the target FileSystem
+ * @param logFile the target file path
+ * @param metrics where to append the file contents
+ * @throws IOException thrown if the file cannot be read
+ */
+ protected void readLogData(FileSystem fs, Path logFile, StringBuilder metrics)
+ throws IOException {
+ FSDataInputStream fsin = fs.open(logFile);
+ BufferedReader in = new BufferedReader(new InputStreamReader(fsin,
+ StandardCharsets.UTF_8));
+ String line = null;
+
+ while ((line = in.readLine()) != null) {
+ metrics.append(line).append("\n");
+ }
+ }
+
+ /**
+ * Return the path to the log file to use, based on the target path.
+ * @param fs the target FileSystem
+ * @param initial the path from which to start
+ * @return the path to use
+ * @throws IOException thrown if testing for file existence fails.
+ */
+ protected Path findMostRecentLogFile(FileSystem fs, Path initial)
+ throws IOException {
+ Path logFile = null;
+ Path nextLogFile = initial;
+ int id = 1;
+
+ do {
+ logFile = nextLogFile;
+ nextLogFile = new Path(initial.toString() + "." + id);
+ id += 1;
+ } while (fs.exists(nextLogFile));
+ return logFile;
+ }
+
+ /**
+ * Assert that the given contents match what is expected from the test
+ * metrics.
+ *
+ * @param contents the file contents to test
+ */
+ protected void assertMetricsContents(String contents) {
+ // Note that in the below expression we allow tags and metrics to go in
+ // arbitrary order, but the records must be in order.
+ final Pattern expectedContentPattern = Pattern.compile(
+ "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+"
+ + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|"
+ + "testTag2=testTagValue2,\\s+testTag1=testTagValue1),"
+ + "\\s+Hostname=.*,\\s+"
+ + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)"
+ + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1,"
+ + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*",
+ Pattern.MULTILINE);
+
+ assertTrue("Sink did not produce the expected output. Actual output was: "
+ + contents, expectedContentPattern.matcher(contents).matches());
+ }
+
+ /**
+ * Assert that the given contents match what is expected from the test
+ * metrics when there is pre-existing data.
+ *
+ * @param contents the file contents to test
+ */
+ protected void assertExtraContents(String contents) {
+ // Note that in the below expression we allow tags and metrics to go in
+ // arbitrary order, but the records must be in order.
+ final Pattern expectedContentPattern = Pattern.compile(
+ "Extra stuff[\\n\\r]*"
+ + "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+"
+ + "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|"
+ + "testTag2=testTagValue2,\\s+testTag1=testTagValue1),"
+ + "\\s+Hostname=.*,\\s+"
+ + "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)"
+ + "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1,"
+ + "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*",
+ Pattern.MULTILINE);
+
+ assertTrue("Sink did not produce the expected output. Actual output was: "
+ + contents, expectedContentPattern.matcher(contents).matches());
+ }
+
+ /**
+ * Call {@link #doWriteTest} after pre-creating the log file and filling it
+ * with junk data.
+ *
+ * @param path the base path for the test
+ * @param ignoreErrors whether to ignore errors
+ * @param allowAppend whether to allow appends
+ * @param count the number of files to expect
+ * @return the contents of the final log file
+ * @throws IOException if a file system operation fails
+ * @throws InterruptedException if interrupted while calling
+ * {@link #getNowNotTopOfHour()}
+ * @throws URISyntaxException if the path is not a valid URI
+ */
+ protected String doAppendTest(String path, boolean ignoreErrors,
+ boolean allowAppend, int count)
+ throws IOException, InterruptedException, URISyntaxException {
+ preCreateLogFile(path);
+
+ return doWriteTest(initMetricsSystem(path, ignoreErrors, allowAppend),
+ path, count);
+ }
+
+ /**
+ * Create a file at the target path with some known data in it:
+ * "Extra stuff".
+ *
+ * If the test run is happening within 20 seconds of the top of the hour,
+ * this method will sleep until the top of the hour.
+ *
+ * @param path the target path under which to create the directory for the
+ * current hour that will contain the log file.
+ *
+ * @throws IOException thrown if the file creation fails
+ * @throws InterruptedException thrown if interrupted while waiting for the
+ * top of the hour.
+ * @throws URISyntaxException thrown if the path isn't a valid URI
+ */
+ protected void preCreateLogFile(String path)
+ throws IOException, InterruptedException, URISyntaxException {
+ preCreateLogFile(path, 1);
+ }
+
+ /**
+ * Create files at the target path with some known data in them. Each file
+ * will have the same content: "Extra stuff".
+ *
+ * If the test run is happening within 20 seconds of the top of the hour,
+ * this method will sleep until the top of the hour.
+ *
+ * @param path the target path under which to create the directory for the
+ * current hour that will contain the log files.
+ * @param numFiles the number of log files to create
+ * @throws IOException thrown if the file creation fails
+ * @throws InterruptedException thrown if interrupted while waiting for the
+ * top of the hour.
+ * @throws URISyntaxException thrown if the path isn't a valid URI
+ */
+ protected void preCreateLogFile(String path, int numFiles)
+ throws IOException, InterruptedException, URISyntaxException {
+ Calendar now = getNowNotTopOfHour();
+
+ FileSystem fs = FileSystem.get(new URI(path), new Configuration());
+ Path dir = new Path(path, DATE_FORMAT.format(now.getTime()));
+
+ fs.mkdirs(dir);
+
+ Path file = new Path(dir,
+ "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log");
+
+ // Create the log file to force the sink to append
+ try (FSDataOutputStream out = fs.create(file)) {
+ out.write("Extra stuff\n".getBytes());
+ out.flush();
+ }
+
+ if (numFiles > 1) {
+ int count = 1;
+
+ while (count < numFiles) {
+ file = new Path(dir, "testsrc-"
+ + InetAddress.getLocalHost().getHostName() + ".log." + count);
+
+ // Create the log file to force the sink to append
+ try (FSDataOutputStream out = fs.create(file)) {
+ out.write("Extra stuff\n".getBytes());
+ out.flush();
+ }
+
+ count += 1;
+ }
+ }
+ }
+
+ /**
+ * Return a calendar based on the current time. If the current time is very
+ * near the top of the hour (less than 20 seconds), sleep until the new hour
+ * before returning a new Calendar instance.
+ *
+ * @return a new Calendar instance that isn't near the top of the hour
+ * @throws InterruptedException if interrupted while sleeping
+ */
+ public Calendar getNowNotTopOfHour() throws InterruptedException {
+ Calendar now = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
+
+ // If we're at the very top of the hour, sleep until the next hour
+ // so that we don't get confused by the directory rolling
+ if ((now.get(Calendar.MINUTE) == 59) && (now.get(Calendar.SECOND) > 40)) {
+ Thread.sleep((61 - now.get(Calendar.SECOND)) * 1000L);
+ now.setTime(new Date());
+ }
+
+ return now;
+ }
+
+ /**
+ * Assert that the number of log files in the target directory is as expected.
+ * @param fs the target FileSystem
+ * @param dir the target directory path
+ * @param expected the expected number of files
+ * @throws IOException thrown if listing files fails
+ */
+ public void assertFileCount(FileSystem fs, Path dir, int expected)
+ throws IOException {
+ RemoteIterator i = fs.listFiles(dir, true);
+ int count = 0;
+
+ while (i.hasNext()) {
+ i.next();
+ count++;
+ }
+
+ assertTrue("The sink created additional unexpected log files. " + count
+ + "files were created", expected >= count);
+ assertTrue("The sink created too few log files. " + count + "files were "
+ + "created", expected <= count);
+ }
+
+ /**
+ * This class is a {@link RollingFileSystemSink} wrapper that tracks whether
+ * an exception has been thrown during operations.
+ */
+ public static class ErrorSink extends RollingFileSystemSink {
+ public static volatile boolean errored = false;
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ try {
+ super.putMetrics(record);
+ } catch (MetricsException ex) {
+ errored = true;
+
+ throw new MetricsException(ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } catch (MetricsException ex) {
+ errored = true;
+
+ throw new MetricsException(ex);
+ }
+ }
+
+ @Override
+ public void flush() {
+ try {
+ super.flush();
+ } catch (MetricsException ex) {
+ errored = true;
+
+ throw new MetricsException(ex);
+ }
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
new file mode 100644
index 00000000000..da632351be2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the {@link RollingFileSystemSink} class in the context of the local file
+ * system.
+ */
+public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
+ /**
+ * Test writing logs to the local file system.
+ * @throws Exception when things break
+ */
+ @Test
+ public void testWrite() throws Exception {
+ String path = methodDir.getAbsolutePath();
+ MetricsSystem ms = initMetricsSystem(path, false, false);
+
+ assertMetricsContents(doWriteTest(ms, path, 1));
+ }
+
+ /**
+ * Test writing logs to the local file system with the sink set to ignore
+ * errors.
+ * @throws Exception when things break
+ */
+ @Test
+ public void testSilentWrite() throws Exception {
+ String path = methodDir.getAbsolutePath();
+ MetricsSystem ms = initMetricsSystem(path, true, false);
+
+ assertMetricsContents(doWriteTest(ms, path, 1));
+ }
+
+ /**
+ * Test writing logs to HDFS when the log file already exists.
+ *
+ * @throws Exception when things break
+ */
+ @Test
+ public void testExistingWrite() throws Exception {
+ String path = methodDir.getAbsolutePath();
+
+ assertMetricsContents(doAppendTest(path, false, false, 2));
+ }
+
+ /**
+ * Test writing logs to HDFS when the log file and the .1 log file already
+ * exist.
+ *
+ * @throws Exception when things break
+ */
+ @Test
+ public void testExistingWrite2() throws Exception {
+ String path = methodDir.getAbsolutePath();
+ MetricsSystem ms = initMetricsSystem(path, false, false);
+
+ preCreateLogFile(path, 2);
+
+ assertMetricsContents(doWriteTest(ms, path, 3));
+ }
+
+ /**
+ * Test writing logs to HDFS with ignore errors enabled when
+ * the log file already exists.
+ *
+ * @throws Exception when things break
+ */
+ @Test
+ public void testSilentExistingWrite() throws Exception {
+ String path = methodDir.getAbsolutePath();
+
+ assertMetricsContents(doAppendTest(path, false, false, 2));
+ }
+
+ /**
+ * Test that writing fails when the directory isn't writable.
+ */
+ @Test
+ public void testFailedWrite() {
+ String path = methodDir.getAbsolutePath();
+ MetricsSystem ms = initMetricsSystem(path, false, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ methodDir.setWritable(false);
+ ErrorSink.errored = false;
+
+ try {
+ // publish the metrics
+ ms.publishMetricsNow();
+
+ assertTrue("No exception was generated while writing metrics "
+ + "even though the target directory was not writable",
+ ErrorSink.errored);
+
+ ms.stop();
+ ms.shutdown();
+ } finally {
+ // Make sure the dir is writable again so we can delete it at the end
+ methodDir.setWritable(true);
+ }
+ }
+
+ /**
+ * Test that writing fails silently when the directory is not writable.
+ */
+ @Test
+ public void testSilentFailedWrite() {
+ String path = methodDir.getAbsolutePath();
+ MetricsSystem ms = initMetricsSystem(path, true, false);
+
+ new MyMetrics1().registerWith(ms);
+
+ methodDir.setWritable(false);
+ ErrorSink.errored = false;
+
+ try {
+ // publish the metrics
+ ms.publishMetricsNow();
+
+ assertFalse("An exception was generated while writing metrics "
+ + "when the target directory was not writable, even though the "
+ + "sink is set to ignore errors",
+ ErrorSink.errored);
+
+ ms.stop();
+ ms.shutdown();
+ } finally {
+ // Make sure the dir is writable again so we can delete it at the end
+ methodDir.setWritable(true);
+ }
+ }
+}