HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha)
(cherry picked from commit ee005e010c
)
This commit is contained in:
parent
36f0e73882
commit
363a8212ec
|
@ -6,6 +6,8 @@ Release 2.9.0 - UNRELEASED
|
|||
|
||||
NEW FEATURES
|
||||
|
||||
HADOOP-12702. Add an HDFS metrics sink. (Daniel Templeton via kasha)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-12321. Make JvmPauseMonitor an AbstractService.
|
||||
|
|
|
@ -0,0 +1,420 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.sink;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.apache.commons.configuration.SubsetConfiguration;
|
||||
import org.apache.commons.lang.time.FastDateFormat;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||
import org.apache.hadoop.metrics2.MetricsException;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
import org.apache.hadoop.metrics2.MetricsSink;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
|
||||
/**
|
||||
* This class is a metrics sink that uses
|
||||
* {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every
|
||||
* hour a new directory will be created under the path specified by the
|
||||
* <code>basepath</code> property. All metrics will be logged to a file in the
|
||||
* current hour's directory in a file named <hostname>.log, where
|
||||
* <hostname> is the name of the host on which the metrics logging
|
||||
* process is running. The base path is set by the
|
||||
* <code><prefix>.sink.<instance>.basepath</code> property. The
|
||||
* time zone used to create the current hour's directory name is GMT. If the
|
||||
* <code>basepath</code> property isn't specified, it will default to
|
||||
* "/tmp", which is the temp directory on whatever default file
|
||||
* system is configured for the cluster.
|
||||
*
|
||||
* The <code><prefix>.sink.<instance>.ignore-error</code> property
|
||||
* controls whether an exception is thrown when an error is encountered writing
|
||||
* a log file. The default value is <code>true</code>. When set to
|
||||
* <code>false</code>, file errors are quietly swallowed.
|
||||
*
|
||||
* The primary use of this class is for logging to HDFS. As it uses
|
||||
* {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
|
||||
* however, it can be used to write to the local file system, Amazon S3, or any
|
||||
* other supported file system. The base path for the sink will determine the
|
||||
* file system used. An unqualified path will write to the default file system
|
||||
* set by the configuration.
|
||||
*
|
||||
* Not all file systems support the ability to append to files. In file systems
|
||||
* without the ability to append to files, only one writer can write to a file
|
||||
* at a time. To allow for concurrent writes from multiple daemons on a single
|
||||
* host, the <code>source</code> property should be set to the name of the
|
||||
* source daemon, e.g. <i>namenode</i>. The value of the <code>source</code>
|
||||
* property should typically be the same as the property's prefix. If this
|
||||
* property is not set, the source is taken to be <i>unknown</i>.
|
||||
*
|
||||
* Instead of appending to an existing file, by default the sink
|
||||
* will create a new file with a suffix of ".<n>&quet;, where
|
||||
* <i>n</i> is the next lowest integer that isn't already used in a file name,
|
||||
* similar to the Hadoop daemon logs. NOTE: the file with the <b>highest</b>
|
||||
* sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.
|
||||
*
|
||||
* For file systems that allow append, the sink supports appending to the
|
||||
* existing file instead. If the <code>allow-append</code> property is set to
|
||||
* true, the sink will instead append to the existing file on file systems that
|
||||
* support appends. By default, the <code>allow-append</code> property is
|
||||
* false.
|
||||
*
|
||||
* Note that when writing to HDFS with <code>allow-append</code> set to true,
|
||||
* there is a minimum acceptable number of data nodes. If the number of data
|
||||
* nodes drops below that minimum, the append will succeed, but reading the
|
||||
* data will fail with an IOException in the DataStreamer class. The minimum
|
||||
* number of data nodes required for a successful append is generally 2 or 3.
|
||||
*
|
||||
* Note also that when writing to HDFS, the file size information is not updated
|
||||
* until the file is closed (e.g. at the top of the hour) even though the data
|
||||
* is being written successfully. This is a known HDFS limitation that exists
|
||||
* because of the performance cost of updating the metadata. See
|
||||
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class RollingFileSystemSink implements MetricsSink, Closeable {
|
||||
private static final String BASEPATH_KEY = "basepath";
|
||||
private static final String SOURCE_KEY = "source";
|
||||
private static final String IGNORE_ERROR_KEY = "ignore-error";
|
||||
private static final String ALLOW_APPEND_KEY = "allow-append";
|
||||
private static final String SOURCE_DEFAULT = "unknown";
|
||||
private static final String BASEPATH_DEFAULT = "/tmp";
|
||||
private static final FastDateFormat DATE_FORMAT =
|
||||
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
|
||||
private String source;
|
||||
private boolean ignoreError;
|
||||
private boolean allowAppend;
|
||||
private Path basePath;
|
||||
private FileSystem fileSystem;
|
||||
// The current directory path into which we're writing files
|
||||
private Path currentDirPath;
|
||||
// The path to the current file into which we're writing data
|
||||
private Path currentFilePath;
|
||||
// The stream to which we're currently writing.
|
||||
private PrintStream currentOutStream;
|
||||
// We keep this only to be able to call hsynch() on it.
|
||||
private FSDataOutputStream currentFSOutStream;
|
||||
|
||||
@Override
|
||||
public void init(SubsetConfiguration conf) {
|
||||
basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
|
||||
source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
|
||||
ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
|
||||
allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
|
||||
|
||||
try {
|
||||
fileSystem = FileSystem.get(new URI(basePath.toString()),
|
||||
new Configuration());
|
||||
} catch (URISyntaxException ex) {
|
||||
throw new MetricsException("The supplied filesystem base path URI"
|
||||
+ " is not a valid URI: " + basePath.toString(), ex);
|
||||
} catch (IOException ex) {
|
||||
throw new MetricsException("Error connecting to file system: "
|
||||
+ basePath + " [" + ex.toString() + "]", ex);
|
||||
}
|
||||
|
||||
// If we're permitted to append, check if we actually can
|
||||
if (allowAppend) {
|
||||
allowAppend = checkAppend(fileSystem);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test whether the file system supports append and return the answer.
|
||||
* @param fs the target file system
|
||||
*/
|
||||
private boolean checkAppend(FileSystem fs) {
|
||||
boolean canAppend = true;
|
||||
|
||||
try {
|
||||
fs.append(basePath);
|
||||
} catch (IOException ex) {
|
||||
if (ex.getMessage().equals("Not supported")) {
|
||||
canAppend = false;
|
||||
}
|
||||
}
|
||||
|
||||
return canAppend;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the current directory against the time stamp. If they're not
|
||||
* the same, create a new directory and a new log file in that directory.
|
||||
*
|
||||
* @throws MetricsException thrown if an error occurs while creating the
|
||||
* new directory or new log file
|
||||
*/
|
||||
private void rollLogDirIfNeeded() throws MetricsException {
|
||||
String currentDir = DATE_FORMAT.format(new Date());
|
||||
Path path = new Path(basePath, currentDir);
|
||||
|
||||
// We check whether currentOutStream is null instead of currentDirPath,
|
||||
// because if currentDirPath is null, then currentOutStream is null, but
|
||||
// currentOutStream can be null for other reasons.
|
||||
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
|
||||
currentDirPath = path;
|
||||
|
||||
if (currentOutStream != null) {
|
||||
currentOutStream.close();
|
||||
}
|
||||
|
||||
try {
|
||||
rollLogDir();
|
||||
} catch (IOException ex) {
|
||||
throwMetricsException("Failed to creating new log file", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new directory based on the current hour and a new log file in
|
||||
* that directory.
|
||||
*
|
||||
* @throws IOException thrown if an error occurs while creating the
|
||||
* new directory or new log file
|
||||
*/
|
||||
private void rollLogDir() throws IOException {
|
||||
String fileName =
|
||||
source + "-" + InetAddress.getLocalHost().getHostName() + ".log";
|
||||
|
||||
Path targetFile = new Path(currentDirPath, fileName);
|
||||
fileSystem.mkdirs(currentDirPath);
|
||||
|
||||
if (allowAppend) {
|
||||
createOrAppendLogFile(targetFile);
|
||||
} else {
|
||||
createLogFile(targetFile);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new log file and return the {@link FSDataOutputStream}. If a
|
||||
* file with the specified path already exists, add a suffix, starting with 1
|
||||
* and try again. Keep incrementing the suffix until a nonexistent target
|
||||
* path is found.
|
||||
*
|
||||
* Once the file is open, update {@link #currentFSOutStream},
|
||||
* {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
|
||||
*
|
||||
* @param initial the target path
|
||||
* @throws IOException thrown if the call to see if the exists fails
|
||||
*/
|
||||
private void createLogFile(Path initial) throws IOException {
|
||||
Path currentAttempt = initial;
|
||||
int id = 1;
|
||||
|
||||
while (true) {
|
||||
// First try blindly creating the file. If we fail, it either means
|
||||
// the file exists, or the operation actually failed. We do it this way
|
||||
// because if we check whether the file exists, it might still be created
|
||||
// by the time we try to create it. Creating first works like a
|
||||
// test-and-set.
|
||||
try {
|
||||
currentFSOutStream = fileSystem.create(currentAttempt, false);
|
||||
currentOutStream = new PrintStream(currentFSOutStream, true,
|
||||
StandardCharsets.UTF_8.name());
|
||||
currentFilePath = currentAttempt;
|
||||
break;
|
||||
} catch (IOException ex) {
|
||||
// Now we can check to see if the file exists to know why we failed
|
||||
if (fileSystem.exists(currentAttempt)) {
|
||||
currentAttempt = new Path(initial.toString() + "." + id);
|
||||
id += 1;
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new log file and return the {@link FSDataOutputStream}. If a
|
||||
* file with the specified path already exists, open the file for append
|
||||
* instead.
|
||||
*
|
||||
* Once the file is open, update {@link #currentFSOutStream},
|
||||
* {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
|
||||
*
|
||||
* @param initial the target path
|
||||
* @throws IOException thrown if the call to see the append operation fails.
|
||||
*/
|
||||
private void createOrAppendLogFile(Path targetFile) throws IOException {
|
||||
// First try blindly creating the file. If we fail, it either means
|
||||
// the file exists, or the operation actually failed. We do it this way
|
||||
// because if we check whether the file exists, it might still be created
|
||||
// by the time we try to create it. Creating first works like a
|
||||
// test-and-set.
|
||||
try {
|
||||
currentFSOutStream = fileSystem.create(targetFile, false);
|
||||
currentOutStream = new PrintStream(currentFSOutStream, true,
|
||||
StandardCharsets.UTF_8.name());
|
||||
} catch (IOException ex) {
|
||||
// Try appending instead. If we fail, if means the file doesn't
|
||||
// actually exist yet or the operation actually failed.
|
||||
try {
|
||||
currentFSOutStream = fileSystem.append(targetFile);
|
||||
currentOutStream = new PrintStream(currentFSOutStream, true,
|
||||
StandardCharsets.UTF_8.name());
|
||||
} catch (IOException ex2) {
|
||||
// If the original create failed for a legit but transitory
|
||||
// reason, the append will fail because the file now doesn't exist,
|
||||
// resulting in a confusing stack trace. To avoid that, we set
|
||||
// the cause of the second exception to be the first exception.
|
||||
// It's still a tiny bit confusing, but it's enough
|
||||
// information that someone should be able to figure it out.
|
||||
ex2.initCause(ex);
|
||||
|
||||
throw ex2;
|
||||
}
|
||||
}
|
||||
|
||||
currentFilePath = targetFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMetrics(MetricsRecord record) {
|
||||
rollLogDirIfNeeded();
|
||||
|
||||
if (currentOutStream != null) {
|
||||
currentOutStream.printf("%d %s.%s", record.timestamp(),
|
||||
record.context(), record.name());
|
||||
|
||||
String separator = ": ";
|
||||
|
||||
for (MetricsTag tag : record.tags()) {
|
||||
currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value());
|
||||
separator = ", ";
|
||||
}
|
||||
|
||||
for (AbstractMetric metric : record.metrics()) {
|
||||
currentOutStream.printf("%s%s=%s", separator, metric.name(),
|
||||
metric.value());
|
||||
}
|
||||
|
||||
currentOutStream.println();
|
||||
|
||||
// If we don't hflush(), the data may not be written until the file is
|
||||
// closed. The file won't be closed until the top of the hour *AND*
|
||||
// another record is received. Calling hflush() makes sure that the data
|
||||
// is complete at the top of the hour.
|
||||
try {
|
||||
currentFSOutStream.hflush();
|
||||
} catch (IOException ex) {
|
||||
throwMetricsException("Failed flushing the stream", ex);
|
||||
}
|
||||
|
||||
checkForErrors("Unable to write to log file");
|
||||
} else if (!ignoreError) {
|
||||
throwMetricsException("Unable to write to log file");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
// currentOutStream is null if currentFSOutStream is null
|
||||
if (currentFSOutStream != null) {
|
||||
try {
|
||||
currentFSOutStream.hflush();
|
||||
} catch (IOException ex) {
|
||||
throwMetricsException("Unable to flush log file", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (currentOutStream != null) {
|
||||
currentOutStream.close();
|
||||
|
||||
try {
|
||||
checkForErrors("Unable to close log file");
|
||||
} finally {
|
||||
// Null out the streams just in case someone tries to reuse us.
|
||||
currentOutStream = null;
|
||||
currentFSOutStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the sink isn't set to ignore errors, throw a {@link MetricsException}
|
||||
* if the stream encountered an exception. The message parameter will be used
|
||||
* as the new exception's message with the current file name
|
||||
* ({@link #currentFilePath}) appended to it.
|
||||
*
|
||||
* @param message the exception message. The message will have the current
|
||||
* file name ({@link #currentFilePath}) appended to it.
|
||||
* @throws MetricsException thrown if there was an error and the sink isn't
|
||||
* ignoring errors
|
||||
*/
|
||||
private void checkForErrors(String message)
|
||||
throws MetricsException {
|
||||
if (!ignoreError && currentOutStream.checkError()) {
|
||||
throw new MetricsException(message + ": " + currentFilePath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the sink isn't set to ignore errors, wrap the Throwable in a
|
||||
* {@link MetricsException} and throw it. The message parameter will be used
|
||||
* as the new exception's message with the current file name
|
||||
* ({@link #currentFilePath}) and the Throwable's string representation
|
||||
* appended to it.
|
||||
*
|
||||
* @param message the exception message. The message will have the current
|
||||
* file name ({@link #currentFilePath}) and the Throwable's string
|
||||
* representation appended to it.
|
||||
* @param t the Throwable to wrap
|
||||
*/
|
||||
private void throwMetricsException(String message, Throwable t) {
|
||||
if (!ignoreError) {
|
||||
throw new MetricsException(message + ": " + currentFilePath + " ["
|
||||
+ t.toString() + "]", t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the sink isn't set to ignore errors, throw a new
|
||||
* {@link MetricsException}. The message parameter will be used as the
|
||||
* new exception's message with the current file name
|
||||
* ({@link #currentFilePath}) appended to it.
|
||||
*
|
||||
* @param message the exception message. The message will have the current
|
||||
* file name ({@link #currentFilePath}) appended to it.
|
||||
*/
|
||||
private void throwMetricsException(String message) {
|
||||
if (!ignoreError) {
|
||||
throw new MetricsException(message + ": " + currentFilePath);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,506 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.sink;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.TimeZone;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.metrics2.MetricsException;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric.Type;
|
||||
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* This class is a base class for testing the {@link RollingFileSystemSink}
|
||||
* class in various contexts. It provides the a number of useful utility
|
||||
* methods for classes that extend it.
|
||||
*/
|
||||
public class RollingFileSystemSinkTestBase {
|
||||
protected static final File ROOT_TEST_DIR =
|
||||
new File(System.getProperty("test.build.data", "target/"),
|
||||
"FileSystemSinkTest");
|
||||
protected static final SimpleDateFormat DATE_FORMAT =
|
||||
new SimpleDateFormat("yyyyMMddHH");
|
||||
protected static File methodDir;
|
||||
|
||||
/**
|
||||
* The name of the current test method.
|
||||
*/
|
||||
@Rule
|
||||
public TestName methodName = new TestName();
|
||||
|
||||
/**
|
||||
* A sample metric class
|
||||
*/
|
||||
@Metrics(name="testRecord1", context="test1")
|
||||
protected class MyMetrics1 {
|
||||
@Metric(value={"testTag1", ""}, type=Type.TAG)
|
||||
String testTag1() { return "testTagValue1"; }
|
||||
|
||||
@Metric(value={"testTag2", ""}, type=Type.TAG)
|
||||
String gettestTag2() { return "testTagValue2"; }
|
||||
|
||||
@Metric(value={"testMetric1", "An integer gauge"}, always=true)
|
||||
MutableGaugeInt testMetric1;
|
||||
|
||||
@Metric(value={"testMetric2", "A long gauge"}, always=true)
|
||||
MutableGaugeLong testMetric2;
|
||||
|
||||
public MyMetrics1 registerWith(MetricsSystem ms) {
|
||||
return ms.register(methodName.getMethodName() + "-m1", null, this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Another sample metrics class
|
||||
*/
|
||||
@Metrics(name="testRecord2", context="test1")
|
||||
protected class MyMetrics2 {
|
||||
@Metric(value={"testTag22", ""}, type=Type.TAG)
|
||||
String testTag1() { return "testTagValue22"; }
|
||||
|
||||
public MyMetrics2 registerWith(MetricsSystem ms) {
|
||||
return ms.register(methodName.getMethodName() + "-m2", null, this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the date format's timezone to GMT.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setTZ() {
|
||||
DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the test directory for this test.
|
||||
* @throws IOException thrown if the delete fails
|
||||
*/
|
||||
@AfterClass
|
||||
public static void deleteBaseDir() throws IOException {
|
||||
FileUtils.deleteDirectory(ROOT_TEST_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the test directory for this test.
|
||||
* @throws IOException thrown if the create fails
|
||||
*/
|
||||
@Before
|
||||
public void createMethodDir() throws IOException {
|
||||
methodDir = new File(ROOT_TEST_DIR, methodName.getMethodName());
|
||||
|
||||
methodDir.mkdirs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up the metrics system, start it, and return it.
|
||||
* @param path the base path for the sink
|
||||
* @param ignoreErrors whether the sink should ignore errors
|
||||
* @param allowAppend whether the sink is allowed to append to existing files
|
||||
* @return the metrics system
|
||||
*/
|
||||
protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
|
||||
boolean allowAppend) {
|
||||
// If the prefix is not lower case, the metrics system won't be able to
|
||||
// read any of the properties.
|
||||
final String prefix = methodName.getMethodName().toLowerCase();
|
||||
|
||||
new ConfigBuilder().add("*.period", 10000)
|
||||
.add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
|
||||
.add(prefix + ".sink.mysink0.basepath", path)
|
||||
.add(prefix + ".sink.mysink0.source", "testsrc")
|
||||
.add(prefix + ".sink.mysink0.context", "test1")
|
||||
.add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
|
||||
.add(prefix + ".sink.mysink0.allow-append", allowAppend)
|
||||
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-" + prefix));
|
||||
|
||||
MetricsSystemImpl ms = new MetricsSystemImpl(prefix);
|
||||
|
||||
ms.start();
|
||||
|
||||
return ms;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that writes metrics files to a target path, reads those
|
||||
* files, and returns the contents of all files as a single string. This
|
||||
* method will assert that the correct number of files is found.
|
||||
*
|
||||
* @param ms an initialized MetricsSystem to use
|
||||
* @param path the target path from which to read the logs
|
||||
* @param count the number of log files to expect
|
||||
* @return the contents of the log files
|
||||
* @throws IOException when the log file can't be read
|
||||
* @throws URISyntaxException when the target path is an invalid URL
|
||||
*/
|
||||
protected String doWriteTest(MetricsSystem ms, String path, int count)
|
||||
throws IOException, URISyntaxException {
|
||||
final String then = DATE_FORMAT.format(new Date());
|
||||
|
||||
MyMetrics1 mm1 = new MyMetrics1().registerWith(ms);
|
||||
new MyMetrics2().registerWith(ms);
|
||||
|
||||
mm1.testMetric1.incr();
|
||||
mm1.testMetric2.incr(2);
|
||||
|
||||
ms.publishMetricsNow(); // publish the metrics
|
||||
ms.stop();
|
||||
ms.shutdown();
|
||||
|
||||
return readLogFile(path, then, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the log files at the target path and return the contents as a single
|
||||
* string. This method will assert that the correct number of files is found.
|
||||
*
|
||||
* @param path the target path
|
||||
* @param then when the test method began. Used to find the log directory in
|
||||
* the case that the test run crosses the top of the hour.
|
||||
* @param count the number of log files to expect
|
||||
* @return
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
protected String readLogFile(String path, String then, int count)
|
||||
throws IOException, URISyntaxException {
|
||||
final String now = DATE_FORMAT.format(new Date());
|
||||
final String logFile =
|
||||
"testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
|
||||
FileSystem fs = FileSystem.get(new URI(path), new Configuration());
|
||||
StringBuilder metrics = new StringBuilder();
|
||||
boolean found = false;
|
||||
|
||||
for (FileStatus status : fs.listStatus(new Path(path))) {
|
||||
Path logDir = status.getPath();
|
||||
|
||||
// There are only two possible valid log directory names: the time when
|
||||
// the test started and the current time. Anything else can be ignored.
|
||||
if (now.equals(logDir.getName()) || then.equals(logDir.getName())) {
|
||||
readLogData(fs, findMostRecentLogFile(fs, new Path(logDir, logFile)),
|
||||
metrics);
|
||||
assertFileCount(fs, logDir, count);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("No valid log directories found", found);
|
||||
|
||||
return metrics.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the target log file and append its contents to the StringBuilder.
|
||||
* @param fs the target FileSystem
|
||||
* @param logFile the target file path
|
||||
* @param metrics where to append the file contents
|
||||
* @throws IOException thrown if the file cannot be read
|
||||
*/
|
||||
protected void readLogData(FileSystem fs, Path logFile, StringBuilder metrics)
|
||||
throws IOException {
|
||||
FSDataInputStream fsin = fs.open(logFile);
|
||||
BufferedReader in = new BufferedReader(new InputStreamReader(fsin,
|
||||
StandardCharsets.UTF_8));
|
||||
String line = null;
|
||||
|
||||
while ((line = in.readLine()) != null) {
|
||||
metrics.append(line).append("\n");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the path to the log file to use, based on the target path.
|
||||
* @param fs the target FileSystem
|
||||
* @param initial the path from which to start
|
||||
* @return the path to use
|
||||
* @throws IOException thrown if testing for file existence fails.
|
||||
*/
|
||||
protected Path findMostRecentLogFile(FileSystem fs, Path initial)
|
||||
throws IOException {
|
||||
Path logFile = null;
|
||||
Path nextLogFile = initial;
|
||||
int id = 1;
|
||||
|
||||
do {
|
||||
logFile = nextLogFile;
|
||||
nextLogFile = new Path(initial.toString() + "." + id);
|
||||
id += 1;
|
||||
} while (fs.exists(nextLogFile));
|
||||
return logFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the given contents match what is expected from the test
|
||||
* metrics.
|
||||
*
|
||||
* @param contents the file contents to test
|
||||
*/
|
||||
protected void assertMetricsContents(String contents) {
|
||||
// Note that in the below expression we allow tags and metrics to go in
|
||||
// arbitrary order, but the records must be in order.
|
||||
final Pattern expectedContentPattern = Pattern.compile(
|
||||
"^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+"
|
||||
+ "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|"
|
||||
+ "testTag2=testTagValue2,\\s+testTag1=testTagValue1),"
|
||||
+ "\\s+Hostname=.*,\\s+"
|
||||
+ "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)"
|
||||
+ "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1,"
|
||||
+ "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*",
|
||||
Pattern.MULTILINE);
|
||||
|
||||
assertTrue("Sink did not produce the expected output. Actual output was: "
|
||||
+ contents, expectedContentPattern.matcher(contents).matches());
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the given contents match what is expected from the test
|
||||
* metrics when there is pre-existing data.
|
||||
*
|
||||
* @param contents the file contents to test
|
||||
*/
|
||||
protected void assertExtraContents(String contents) {
|
||||
// Note that in the below expression we allow tags and metrics to go in
|
||||
// arbitrary order, but the records must be in order.
|
||||
final Pattern expectedContentPattern = Pattern.compile(
|
||||
"Extra stuff[\\n\\r]*"
|
||||
+ "^\\d+\\s+test1.testRecord1:\\s+Context=test1,\\s+"
|
||||
+ "(testTag1=testTagValue1,\\s+testTag2=testTagValue2|"
|
||||
+ "testTag2=testTagValue2,\\s+testTag1=testTagValue1),"
|
||||
+ "\\s+Hostname=.*,\\s+"
|
||||
+ "(testMetric1=1,\\s+testMetric2=2|testMetric2=2,\\s+testMetric1=1)"
|
||||
+ "[\\n\\r]*^\\d+\\s+test1.testRecord2:\\s+Context=test1,"
|
||||
+ "\\s+testTag22=testTagValue22,\\s+Hostname=.*$[\\n\\r]*",
|
||||
Pattern.MULTILINE);
|
||||
|
||||
assertTrue("Sink did not produce the expected output. Actual output was: "
|
||||
+ contents, expectedContentPattern.matcher(contents).matches());
|
||||
}
|
||||
|
||||
/**
|
||||
* Call {@link #doWriteTest} after pre-creating the log file and filling it
|
||||
* with junk data.
|
||||
*
|
||||
* @param path the base path for the test
|
||||
* @param ignoreErrors whether to ignore errors
|
||||
* @param allowAppend whether to allow appends
|
||||
* @param count the number of files to expect
|
||||
* @return the contents of the final log file
|
||||
* @throws IOException if a file system operation fails
|
||||
* @throws InterruptedException if interrupted while calling
|
||||
* {@link #getNowNotTopOfHour()}
|
||||
* @throws URISyntaxException if the path is not a valid URI
|
||||
*/
|
||||
protected String doAppendTest(String path, boolean ignoreErrors,
|
||||
boolean allowAppend, int count)
|
||||
throws IOException, InterruptedException, URISyntaxException {
|
||||
preCreateLogFile(path);
|
||||
|
||||
return doWriteTest(initMetricsSystem(path, ignoreErrors, allowAppend),
|
||||
path, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file at the target path with some known data in it:
|
||||
* "Extra stuff".
|
||||
*
|
||||
* If the test run is happening within 20 seconds of the top of the hour,
|
||||
* this method will sleep until the top of the hour.
|
||||
*
|
||||
* @param path the target path under which to create the directory for the
|
||||
* current hour that will contain the log file.
|
||||
*
|
||||
* @throws IOException thrown if the file creation fails
|
||||
* @throws InterruptedException thrown if interrupted while waiting for the
|
||||
* top of the hour.
|
||||
* @throws URISyntaxException thrown if the path isn't a valid URI
|
||||
*/
|
||||
protected void preCreateLogFile(String path)
|
||||
throws IOException, InterruptedException, URISyntaxException {
|
||||
preCreateLogFile(path, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create files at the target path with some known data in them. Each file
|
||||
* will have the same content: "Extra stuff".
|
||||
*
|
||||
* If the test run is happening within 20 seconds of the top of the hour,
|
||||
* this method will sleep until the top of the hour.
|
||||
*
|
||||
* @param path the target path under which to create the directory for the
|
||||
* current hour that will contain the log files.
|
||||
* @param numFiles the number of log files to create
|
||||
* @throws IOException thrown if the file creation fails
|
||||
* @throws InterruptedException thrown if interrupted while waiting for the
|
||||
* top of the hour.
|
||||
* @throws URISyntaxException thrown if the path isn't a valid URI
|
||||
*/
|
||||
protected void preCreateLogFile(String path, int numFiles)
|
||||
throws IOException, InterruptedException, URISyntaxException {
|
||||
Calendar now = getNowNotTopOfHour();
|
||||
|
||||
FileSystem fs = FileSystem.get(new URI(path), new Configuration());
|
||||
Path dir = new Path(path, DATE_FORMAT.format(now.getTime()));
|
||||
|
||||
fs.mkdirs(dir);
|
||||
|
||||
Path file = new Path(dir,
|
||||
"testsrc-" + InetAddress.getLocalHost().getHostName() + ".log");
|
||||
|
||||
// Create the log file to force the sink to append
|
||||
try (FSDataOutputStream out = fs.create(file)) {
|
||||
out.write("Extra stuff\n".getBytes());
|
||||
out.flush();
|
||||
}
|
||||
|
||||
if (numFiles > 1) {
|
||||
int count = 1;
|
||||
|
||||
while (count < numFiles) {
|
||||
file = new Path(dir, "testsrc-"
|
||||
+ InetAddress.getLocalHost().getHostName() + ".log." + count);
|
||||
|
||||
// Create the log file to force the sink to append
|
||||
try (FSDataOutputStream out = fs.create(file)) {
|
||||
out.write("Extra stuff\n".getBytes());
|
||||
out.flush();
|
||||
}
|
||||
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a calendar based on the current time. If the current time is very
|
||||
* near the top of the hour (less than 20 seconds), sleep until the new hour
|
||||
* before returning a new Calendar instance.
|
||||
*
|
||||
* @return a new Calendar instance that isn't near the top of the hour
|
||||
* @throws InterruptedException if interrupted while sleeping
|
||||
*/
|
||||
public Calendar getNowNotTopOfHour() throws InterruptedException {
|
||||
Calendar now = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
|
||||
|
||||
// If we're at the very top of the hour, sleep until the next hour
|
||||
// so that we don't get confused by the directory rolling
|
||||
if ((now.get(Calendar.MINUTE) == 59) && (now.get(Calendar.SECOND) > 40)) {
|
||||
Thread.sleep((61 - now.get(Calendar.SECOND)) * 1000L);
|
||||
now.setTime(new Date());
|
||||
}
|
||||
|
||||
return now;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the number of log files in the target directory is as expected.
|
||||
* @param fs the target FileSystem
|
||||
* @param dir the target directory path
|
||||
* @param expected the expected number of files
|
||||
* @throws IOException thrown if listing files fails
|
||||
*/
|
||||
public void assertFileCount(FileSystem fs, Path dir, int expected)
|
||||
throws IOException {
|
||||
RemoteIterator<LocatedFileStatus> i = fs.listFiles(dir, true);
|
||||
int count = 0;
|
||||
|
||||
while (i.hasNext()) {
|
||||
i.next();
|
||||
count++;
|
||||
}
|
||||
|
||||
assertTrue("The sink created additional unexpected log files. " + count
|
||||
+ "files were created", expected >= count);
|
||||
assertTrue("The sink created too few log files. " + count + "files were "
|
||||
+ "created", expected <= count);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is a {@link RollingFileSystemSink} wrapper that tracks whether
|
||||
* an exception has been thrown during operations.
|
||||
*/
|
||||
public static class ErrorSink extends RollingFileSystemSink {
|
||||
public static volatile boolean errored = false;
|
||||
|
||||
@Override
|
||||
public void putMetrics(MetricsRecord record) {
|
||||
try {
|
||||
super.putMetrics(record);
|
||||
} catch (MetricsException ex) {
|
||||
errored = true;
|
||||
|
||||
throw new MetricsException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
super.close();
|
||||
} catch (MetricsException ex) {
|
||||
errored = true;
|
||||
|
||||
throw new MetricsException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
try {
|
||||
super.flush();
|
||||
} catch (MetricsException ex) {
|
||||
errored = true;
|
||||
|
||||
throw new MetricsException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.sink;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test the {@link RollingFileSystemSink} class in the context of the local file
|
||||
* system.
|
||||
*/
|
||||
public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
|
||||
/**
|
||||
* Test writing logs to the local file system.
|
||||
* @throws Exception when things break
|
||||
*/
|
||||
@Test
|
||||
public void testWrite() throws Exception {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
MetricsSystem ms = initMetricsSystem(path, false, false);
|
||||
|
||||
assertMetricsContents(doWriteTest(ms, path, 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test writing logs to the local file system with the sink set to ignore
|
||||
* errors.
|
||||
* @throws Exception when things break
|
||||
*/
|
||||
@Test
|
||||
public void testSilentWrite() throws Exception {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
MetricsSystem ms = initMetricsSystem(path, true, false);
|
||||
|
||||
assertMetricsContents(doWriteTest(ms, path, 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test writing logs to HDFS when the log file already exists.
|
||||
*
|
||||
* @throws Exception when things break
|
||||
*/
|
||||
@Test
|
||||
public void testExistingWrite() throws Exception {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
|
||||
assertMetricsContents(doAppendTest(path, false, false, 2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test writing logs to HDFS when the log file and the .1 log file already
|
||||
* exist.
|
||||
*
|
||||
* @throws Exception when things break
|
||||
*/
|
||||
@Test
|
||||
public void testExistingWrite2() throws Exception {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
MetricsSystem ms = initMetricsSystem(path, false, false);
|
||||
|
||||
preCreateLogFile(path, 2);
|
||||
|
||||
assertMetricsContents(doWriteTest(ms, path, 3));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test writing logs to HDFS with ignore errors enabled when
|
||||
* the log file already exists.
|
||||
*
|
||||
* @throws Exception when things break
|
||||
*/
|
||||
@Test
|
||||
public void testSilentExistingWrite() throws Exception {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
|
||||
assertMetricsContents(doAppendTest(path, false, false, 2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that writing fails when the directory isn't writable.
|
||||
*/
|
||||
@Test
|
||||
public void testFailedWrite() {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
MetricsSystem ms = initMetricsSystem(path, false, false);
|
||||
|
||||
new MyMetrics1().registerWith(ms);
|
||||
|
||||
methodDir.setWritable(false);
|
||||
ErrorSink.errored = false;
|
||||
|
||||
try {
|
||||
// publish the metrics
|
||||
ms.publishMetricsNow();
|
||||
|
||||
assertTrue("No exception was generated while writing metrics "
|
||||
+ "even though the target directory was not writable",
|
||||
ErrorSink.errored);
|
||||
|
||||
ms.stop();
|
||||
ms.shutdown();
|
||||
} finally {
|
||||
// Make sure the dir is writable again so we can delete it at the end
|
||||
methodDir.setWritable(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that writing fails silently when the directory is not writable.
|
||||
*/
|
||||
@Test
|
||||
public void testSilentFailedWrite() {
|
||||
String path = methodDir.getAbsolutePath();
|
||||
MetricsSystem ms = initMetricsSystem(path, true, false);
|
||||
|
||||
new MyMetrics1().registerWith(ms);
|
||||
|
||||
methodDir.setWritable(false);
|
||||
ErrorSink.errored = false;
|
||||
|
||||
try {
|
||||
// publish the metrics
|
||||
ms.publishMetricsNow();
|
||||
|
||||
assertFalse("An exception was generated while writing metrics "
|
||||
+ "when the target directory was not writable, even though the "
|
||||
+ "sink is set to ignore errors",
|
||||
ErrorSink.errored);
|
||||
|
||||
ms.stop();
|
||||
ms.shutdown();
|
||||
} finally {
|
||||
// Make sure the dir is writable again so we can delete it at the end
|
||||
methodDir.setWritable(true);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue