HDFS-9782. RollingFileSystemSink should have configurable roll interval. (Daniel Templeton via kasha)

This commit is contained in:
Karthik Kambatla 2016-05-24 10:37:00 -07:00
parent b4078bd17b
commit 57c31a3fef
6 changed files with 767 additions and 286 deletions

View File

@ -31,6 +31,10 @@
import java.util.TimeZone; import java.util.TimeZone;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.lang.time.FastDateFormat; import org.apache.commons.lang.time.FastDateFormat;
@ -53,14 +57,14 @@
/** /**
* <p>This class is a metrics sink that uses * <p>This class is a metrics sink that uses
* {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every
* hour a new directory will be created under the path specified by the * roll interval a new directory will be created under the path specified by the
* <code>basepath</code> property. All metrics will be logged to a file in the * <code>basepath</code> property. All metrics will be logged to a file in the
* current hour's directory in a file named &lt;hostname&gt;.log, where * current interval's directory in a file named &lt;hostname&gt;.log, where
* &lt;hostname&gt; is the name of the host on which the metrics logging * &lt;hostname&gt; is the name of the host on which the metrics logging
* process is running. The base path is set by the * process is running. The base path is set by the
* <code>&lt;prefix&gt;.sink.&lt;instance&gt;.basepath</code> property. The * <code>&lt;prefix&gt;.sink.&lt;instance&gt;.basepath</code> property. The
* time zone used to create the current hour's directory name is GMT. If the * time zone used to create the current interval's directory name is GMT. If
* <code>basepath</code> property isn't specified, it will default to * the <code>basepath</code> property isn't specified, it will default to
* &quot;/tmp&quot;, which is the temp directory on whatever default file * &quot;/tmp&quot;, which is the temp directory on whatever default file
* system is configured for the cluster.</p> * system is configured for the cluster.</p>
* *
@ -69,6 +73,26 @@
* writing a log file. The default value is <code>true</code>. When set to * writing a log file. The default value is <code>true</code>. When set to
* <code>false</code>, file errors are quietly swallowed.</p> * <code>false</code>, file errors are quietly swallowed.</p>
* *
* <p>The <code>roll-interval</code> property sets the amount of time before
* rolling the directory. The default value is 1 hour. The roll interval may
* not be less than 1 minute. The property's value should be given as
* <i>number unit</i>, where <i>number</i> is an integer value, and
* <i>unit</i> is a valid unit. Valid units are <i>minute</i>, <i>hour</i>,
* and <i>day</i>. The units are case insensitive and may be abbreviated or
* plural. If no units are specified, hours are assumed. For example,
* &quot;2&quot;, &quot;2h&quot;, &quot;2 hour&quot;, and
* &quot;2 hours&quot; are all valid ways to specify two hours.</p>
*
* <p>The <code>roll-offset-interval-millis</code> property sets the upper
* bound on a random time interval (in milliseconds) that is used to delay
* before the initial roll. All subsequent rolls will happen an integer
* number of roll intervals after the initial roll, hence retaining the original
* offset. The purpose of this property is to insert some variance in the roll
* times so that large clusters using this sink on every node don't cause a
* performance impact on HDFS by rolling simultaneously. The default value is
* 30000 (30s). When writing to HDFS, as a rule of thumb, the roll offset in
* millis should be no less than the number of sink instances times 5.
*
* <p>The primary use of this class is for logging to HDFS. As it uses * <p>The primary use of this class is for logging to HDFS. As it uses
* {@link org.apache.hadoop.fs.FileSystem} to access the target file system, * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
* however, it can be used to write to the local file system, Amazon S3, or any * however, it can be used to write to the local file system, Amazon S3, or any
@ -79,7 +103,8 @@
* <p>Not all file systems support the ability to append to files. In file * <p>Not all file systems support the ability to append to files. In file
* systems without the ability to append to files, only one writer can write to * systems without the ability to append to files, only one writer can write to
* a file at a time. To allow for concurrent writes from multiple daemons on a * a file at a time. To allow for concurrent writes from multiple daemons on a
* single host, the <code>source</code> property should be set to the name of * single host, the <code>source</code> property is used to set unique headers
* for the log files. The property should be set to the name of
* the source daemon, e.g. <i>namenode</i>. The value of the * the source daemon, e.g. <i>namenode</i>. The value of the
* <code>source</code> property should typically be the same as the property's * <code>source</code> property should typically be the same as the property's
* prefix. If this property is not set, the source is taken to be * prefix. If this property is not set, the source is taken to be
@ -105,7 +130,7 @@
* 3.</p> * 3.</p>
* *
* <p>Note also that when writing to HDFS, the file size information is not * <p>Note also that when writing to HDFS, the file size information is not
* updated until the file is closed (e.g. at the top of the hour) even though * updated until the file is closed (at the end of the interval) even though
* the data is being written successfully. This is a known HDFS limitation that * the data is being written successfully. This is a known HDFS limitation that
* exists because of the performance cost of updating the metadata. See * exists because of the performance cost of updating the metadata. See
* <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p> * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
@ -124,21 +149,32 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
private static final String BASEPATH_KEY = "basepath"; private static final String BASEPATH_KEY = "basepath";
private static final String SOURCE_KEY = "source"; private static final String SOURCE_KEY = "source";
private static final String IGNORE_ERROR_KEY = "ignore-error"; private static final String IGNORE_ERROR_KEY = "ignore-error";
private static final boolean DEFAULT_IGNORE_ERROR = false;
private static final String ALLOW_APPEND_KEY = "allow-append"; private static final String ALLOW_APPEND_KEY = "allow-append";
private static final boolean DEFAULT_ALLOW_APPEND = false;
private static final String KEYTAB_PROPERTY_KEY = "keytab-key"; private static final String KEYTAB_PROPERTY_KEY = "keytab-key";
private static final String USERNAME_PROPERTY_KEY = "principal-key"; private static final String USERNAME_PROPERTY_KEY = "principal-key";
private static final String ROLL_INTERVAL_KEY = "roll-interval";
private static final String DEFAULT_ROLL_INTERVAL = "1h";
private static final String ROLL_OFFSET_INTERVAL_MILLIS_KEY =
"roll-offset-interval-millis";
private static final int DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS = 30000;
private static final String SOURCE_DEFAULT = "unknown"; private static final String SOURCE_DEFAULT = "unknown";
private static final String BASEPATH_DEFAULT = "/tmp"; private static final String BASEPATH_DEFAULT = "/tmp";
private static final FastDateFormat DATE_FORMAT = private static final FastDateFormat DATE_FORMAT =
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT")); FastDateFormat.getInstance("yyyyMMddHHmm", TimeZone.getTimeZone("GMT"));
private final Object lock = new Object(); private final Object lock = new Object();
private boolean initialized = false; private boolean initialized = false;
private SubsetConfiguration properties; private SubsetConfiguration properties;
private Configuration conf; private Configuration conf;
private String source; @VisibleForTesting
private boolean ignoreError; protected String source;
private boolean allowAppend; @VisibleForTesting
private Path basePath; protected boolean ignoreError;
@VisibleForTesting
protected boolean allowAppend;
@VisibleForTesting
protected Path basePath;
private FileSystem fileSystem; private FileSystem fileSystem;
// The current directory path into which we're writing files // The current directory path into which we're writing files
private Path currentDirPath; private Path currentDirPath;
@ -149,11 +185,21 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
// We keep this only to be able to call hsynch() on it. // We keep this only to be able to call hsynch() on it.
private FSDataOutputStream currentFSOutStream; private FSDataOutputStream currentFSOutStream;
private Timer flushTimer; private Timer flushTimer;
// The amount of time between rolls
// This flag is used during testing to make the flusher thread run after only
// a short pause instead of waiting for the top of the hour.
@VisibleForTesting @VisibleForTesting
protected static boolean flushQuickly = false; protected long rollIntervalMillis;
// The maximum amount of random time to add to the initial roll
@VisibleForTesting
protected long rollOffsetIntervalMillis;
// The time for the nextFlush
@VisibleForTesting
protected Calendar nextFlush = null;
// This flag when true causes a metrics write to schedule a flush thread to
// run immediately, but only if a flush thread is already scheduled. (It's a
// timing thing. If the first write forces the flush, it will strand the
// second write.)
@VisibleForTesting
protected static boolean forceFlush = false;
// This flag is used by the flusher thread to indicate that it has run. Used // This flag is used by the flusher thread to indicate that it has run. Used
// only for testing purposes. // only for testing purposes.
@VisibleForTesting @VisibleForTesting
@ -165,13 +211,36 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
@VisibleForTesting @VisibleForTesting
protected static FileSystem suppliedFilesystem = null; protected static FileSystem suppliedFilesystem = null;
/**
* Create an empty instance. Required for reflection.
*/
public RollingFileSystemSink() {
}
/**
* Create an instance for testing.
*
* @param flushIntervalMillis the roll interval in millis
* @param flushOffsetIntervalMillis the roll offset interval in millis
*/
@VisibleForTesting
protected RollingFileSystemSink(long flushIntervalMillis,
long flushOffsetIntervalMillis) {
this.rollIntervalMillis = flushIntervalMillis;
this.rollOffsetIntervalMillis = flushOffsetIntervalMillis;
}
@Override @Override
public void init(SubsetConfiguration metrics2Properties) { public void init(SubsetConfiguration metrics2Properties) {
properties = metrics2Properties; properties = metrics2Properties;
basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT)); basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT); source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false); ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, DEFAULT_IGNORE_ERROR);
allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false); allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, DEFAULT_ALLOW_APPEND);
rollOffsetIntervalMillis =
getNonNegative(ROLL_OFFSET_INTERVAL_MILLIS_KEY,
DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS);
rollIntervalMillis = getRollInterval();
conf = loadConf(); conf = loadConf();
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
@ -179,8 +248,8 @@ public void init(SubsetConfiguration metrics2Properties) {
// Don't do secure setup if it's not needed. // Don't do secure setup if it's not needed.
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
// Validate config so that we don't get an NPE // Validate config so that we don't get an NPE
checkForProperty(properties, KEYTAB_PROPERTY_KEY); checkIfPropertyExists(KEYTAB_PROPERTY_KEY);
checkForProperty(properties, USERNAME_PROPERTY_KEY); checkIfPropertyExists(USERNAME_PROPERTY_KEY);
try { try {
@ -228,6 +297,7 @@ private boolean initFs() {
} }
flushTimer = new Timer("RollingFileSystemSink Flusher", true); flushTimer = new Timer("RollingFileSystemSink Flusher", true);
setInitialFlushTime(new Date());
} }
return success; return success;
@ -238,8 +308,6 @@ private boolean initFs() {
* strings, allowing for either the property or the configuration not to be * strings, allowing for either the property or the configuration not to be
* set. * set.
* *
* @param properties the sink properties
* @param conf the conf
* @param property the property to stringify * @param property the property to stringify
* @return the stringified property * @return the stringified property
*/ */
@ -264,15 +332,98 @@ private String stringifySecurityProperty(String property) {
return securityProperty; return securityProperty;
} }
/**
* Extract the roll interval from the configuration and return it in
* milliseconds.
*
* @return the roll interval in millis
*/
@VisibleForTesting
protected long getRollInterval() {
String rollInterval =
properties.getString(ROLL_INTERVAL_KEY, DEFAULT_ROLL_INTERVAL);
Pattern pattern = Pattern.compile("^\\s*(\\d+)\\s*([A-Za-z]*)\\s*$");
Matcher match = pattern.matcher(rollInterval);
long millis;
if (match.matches()) {
String flushUnit = match.group(2);
int rollIntervalInt;
try {
rollIntervalInt = Integer.parseInt(match.group(1));
} catch (NumberFormatException ex) {
throw new MetricsException("Unrecognized flush interval: "
+ rollInterval + ". Must be a number followed by an optional "
+ "unit. The unit must be one of: minute, hour, day", ex);
}
if ("".equals(flushUnit)) {
millis = TimeUnit.HOURS.toMillis(rollIntervalInt);
} else {
switch (flushUnit.toLowerCase()) {
case "m":
case "min":
case "minute":
case "minutes":
millis = TimeUnit.MINUTES.toMillis(rollIntervalInt);
break;
case "h":
case "hr":
case "hour":
case "hours":
millis = TimeUnit.HOURS.toMillis(rollIntervalInt);
break;
case "d":
case "day":
case "days":
millis = TimeUnit.DAYS.toMillis(rollIntervalInt);
break;
default:
throw new MetricsException("Unrecognized unit for flush interval: "
+ flushUnit + ". Must be one of: minute, hour, day");
}
}
} else {
throw new MetricsException("Unrecognized flush interval: "
+ rollInterval + ". Must be a number followed by an optional unit."
+ " The unit must be one of: minute, hour, day");
}
if (millis < 60000) {
throw new MetricsException("The flush interval property must be "
+ "at least 1 minute. Value was " + rollInterval);
}
return millis;
}
/**
* Return the property value if it's non-negative and throw an exception if
* it's not.
*
* @param key the property key
* @param defaultValue the default value
*/
private long getNonNegative(String key, int defaultValue) {
int flushOffsetIntervalMillis = properties.getInt(key, defaultValue);
if (flushOffsetIntervalMillis < 0) {
throw new MetricsException("The " + key + " property must be "
+ "non-negative. Value was " + flushOffsetIntervalMillis);
}
return flushOffsetIntervalMillis;
}
/** /**
* Throw a {@link MetricsException} if the given property is not set. * Throw a {@link MetricsException} if the given property is not set.
* *
* @param conf the configuration to test
* @param key the key to validate * @param key the key to validate
*/ */
private static void checkForProperty(SubsetConfiguration conf, String key) { private void checkIfPropertyExists(String key) {
if (!conf.containsKey(key)) { if (!properties.containsKey(key)) {
throw new MetricsException("Configuration is missing " + key throw new MetricsException("Metrics2 configuration is missing " + key
+ " property"); + " property");
} }
} }
@ -301,7 +452,6 @@ private Configuration loadConf() {
* Return the supplied file system for testing or otherwise get a new file * Return the supplied file system for testing or otherwise get a new file
* system. * system.
* *
* @param conf the configuration
* @return the file system to use * @return the file system to use
* @throws MetricsException thrown if the file system could not be retrieved * @throws MetricsException thrown if the file system could not be retrieved
*/ */
@ -327,6 +477,7 @@ private FileSystem getFileSystem() throws MetricsException {
/** /**
* Test whether the file system supports append and return the answer. * Test whether the file system supports append and return the answer.
*
* @param fs the target file system * @param fs the target file system
*/ */
private boolean checkAppend(FileSystem fs) { private boolean checkAppend(FileSystem fs) {
@ -351,14 +502,14 @@ private boolean checkAppend(FileSystem fs) {
* new directory or new log file * new directory or new log file
*/ */
private void rollLogDirIfNeeded() throws MetricsException { private void rollLogDirIfNeeded() throws MetricsException {
// Because we're working relative to the clock, we use a Date instead
// of Time.monotonicNow().
Date now = new Date(); Date now = new Date();
String currentDir = DATE_FORMAT.format(now);
Path path = new Path(basePath, currentDir);
// We check whether currentOutStream is null instead of currentDirPath, // We check whether currentOutStream is null instead of currentDirPath,
// because if currentDirPath is null, then currentOutStream is null, but // because if currentDirPath is null, then currentOutStream is null, but
// currentOutStream can be null for other reasons. // currentOutStream can be null for other reasons. Same for nextFlush.
if ((currentOutStream == null) || !path.equals(currentDirPath)) { if ((currentOutStream == null) || now.after(nextFlush.getTime())) {
// If we're not yet connected to HDFS, create the connection // If we're not yet connected to HDFS, create the connection
if (!initialized) { if (!initialized) {
initialized = initFs(); initialized = initFs();
@ -372,7 +523,7 @@ private void rollLogDirIfNeeded() throws MetricsException {
currentOutStream.close(); currentOutStream.close();
} }
currentDirPath = path; currentDirPath = findCurrentDirectory(now);
try { try {
rollLogDir(); rollLogDir();
@ -380,34 +531,41 @@ private void rollLogDirIfNeeded() throws MetricsException {
throwMetricsException("Failed to create new log file", ex); throwMetricsException("Failed to create new log file", ex);
} }
scheduleFlush(now); // Update the time of the next flush
updateFlushTime(now);
// Schedule the next flush at that time
scheduleFlush(nextFlush.getTime());
} }
} else if (forceFlush) {
scheduleFlush(new Date());
} }
} }
/** /**
* Schedule the current hour's directory to be flushed at the top of the next * Use the given time to determine the current directory. The current
* hour. If this ends up running after the top of the next hour, it will * directory will be based on the {@link #rollIntervalMinutes}.
* execute immediately.
* *
* @param now the current time * @param now the current time
* @return the current directory
*/ */
private void scheduleFlush(Date now) { private Path findCurrentDirectory(Date now) {
long offset = ((now.getTime() - nextFlush.getTimeInMillis())
/ rollIntervalMillis) * rollIntervalMillis;
String currentDir =
DATE_FORMAT.format(new Date(nextFlush.getTimeInMillis() + offset));
return new Path(basePath, currentDir);
}
/**
* Schedule the current interval's directory to be flushed. If this ends up
* running after the top of the next interval, it will execute immediately.
*
* @param when the time the thread should run
*/
private void scheduleFlush(Date when) {
// Store the current currentDirPath to close later // Store the current currentDirPath to close later
final PrintStream toClose = currentOutStream; final PrintStream toClose = currentOutStream;
Calendar next = Calendar.getInstance();
next.setTime(now);
if (flushQuickly) {
// If we're running unit tests, flush after a short pause
next.add(Calendar.MILLISECOND, 400);
} else {
// Otherwise flush at the top of the hour
next.set(Calendar.SECOND, 0);
next.set(Calendar.MINUTE, 0);
next.add(Calendar.HOUR, 1);
}
flushTimer.schedule(new TimerTask() { flushTimer.schedule(new TimerTask() {
@Override @Override
@ -420,11 +578,81 @@ public void run() {
hasFlushed = true; hasFlushed = true;
} }
}, next.getTime()); }, when);
} }
/** /**
* Create a new directory based on the current hour and a new log file in * Update the {@link #nextFlush} variable to the next flush time. Add
* an integer number of flush intervals, preserving the initial random offset.
*
* @param now the current time
*/
@VisibleForTesting
protected void updateFlushTime(Date now) {
// In non-initial rounds, add an integer number of intervals to the last
// flush until a time in the future is achieved, thus preserving the
// original random offset.
int millis =
(int) (((now.getTime() - nextFlush.getTimeInMillis())
/ rollIntervalMillis + 1) * rollIntervalMillis);
nextFlush.add(Calendar.MILLISECOND, millis);
}
/**
* Set the {@link #nextFlush} variable to the initial flush time. The initial
* flush will be an integer number of flush intervals past the beginning of
* the current hour and will have a random offset added, up to
* {@link #rollOffsetIntervalMillis}. The initial flush will be a time in
* past that can be used from which to calculate future flush times.
*
* @param now the current time
*/
@VisibleForTesting
protected void setInitialFlushTime(Date now) {
// Start with the beginning of the current hour
nextFlush = Calendar.getInstance();
nextFlush.setTime(now);
nextFlush.set(Calendar.MILLISECOND, 0);
nextFlush.set(Calendar.SECOND, 0);
nextFlush.set(Calendar.MINUTE, 0);
// In the first round, calculate the first flush as the largest number of
// intervals from the beginning of the current hour that's not in the
// future by:
// 1. Subtract the beginning of the hour from the current time
// 2. Divide by the roll interval and round down to get the number of whole
// intervals that have passed since the beginning of the hour
// 3. Multiply by the roll interval to get the number of millis between
// the beginning of the current hour and the beginning of the current
// interval.
int millis = (int) (((now.getTime() - nextFlush.getTimeInMillis())
/ rollIntervalMillis) * rollIntervalMillis);
// Then add some noise to help prevent all the nodes from
// closing their files at the same time.
if (rollOffsetIntervalMillis > 0) {
millis += ThreadLocalRandom.current().nextLong(rollOffsetIntervalMillis);
// If the added time puts us into the future, step back one roll interval
// because the code to increment nextFlush to the next flush expects that
// nextFlush is the next flush from the previous interval. There wasn't
// a previous interval, so we just fake it with the time in the past that
// would have been the previous interval if there had been one.
//
// It's OK if millis comes out negative.
while (nextFlush.getTimeInMillis() + millis > now.getTime()) {
millis -= rollIntervalMillis;
}
}
// Adjust the next flush time by millis to get the time of our ficticious
// previous next flush
nextFlush.add(Calendar.MILLISECOND, millis);
}
/**
* Create a new directory based on the current interval and a new log file in
* that directory. * that directory.
* *
* @throws IOException thrown if an error occurs while creating the * @throws IOException thrown if an error occurs while creating the
@ -451,7 +679,8 @@ private void rollLogDir() throws IOException {
* path is found. * path is found.
* *
* Once the file is open, update {@link #currentFSOutStream}, * Once the file is open, update {@link #currentFSOutStream},
* {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. * {@link #currentOutStream}, and {@#link #currentFilePath} are set
* appropriately.
* *
* @param initial the target path * @param initial the target path
* @throws IOException thrown if the call to see if the exists fails * @throws IOException thrown if the call to see if the exists fails
@ -552,7 +781,7 @@ private int extractId(String file) {
* instead. * instead.
* *
* Once the file is open, update {@link #currentFSOutStream}, * Once the file is open, update {@link #currentFSOutStream},
* {@link #currentOutStream}, and {@#link #currentFile} are set appropriately. * {@link #currentOutStream}, and {@#link #currentFilePath}.
* *
* @param initial the target path * @param initial the target path
* @throws IOException thrown if the call to see the append operation fails. * @throws IOException thrown if the call to see the append operation fails.
@ -615,9 +844,9 @@ public void putMetrics(MetricsRecord record) {
currentOutStream.println(); currentOutStream.println();
// If we don't hflush(), the data may not be written until the file is // If we don't hflush(), the data may not be written until the file is
// closed. The file won't be closed until the top of the hour *AND* // closed. The file won't be closed until the end of the interval *AND*
// another record is received. Calling hflush() makes sure that the data // another record is received. Calling hflush() makes sure that the data
// is complete at the top of the hour. // is complete at the end of the interval.
try { try {
currentFSOutStream.hflush(); currentFSOutStream.hflush();
} catch (IOException ex) { } catch (IOException ex) {
@ -668,8 +897,8 @@ public void close() {
* as the new exception's message with the current file name * as the new exception's message with the current file name
* ({@link #currentFilePath}) appended to it. * ({@link #currentFilePath}) appended to it.
* *
* @param message the exception message. The message will have the current * @param message the exception message. The message will have a colon and
* file name ({@link #currentFilePath}) appended to it. * the current file name ({@link #currentFilePath}) appended to it.
* @throws MetricsException thrown if there was an error and the sink isn't * @throws MetricsException thrown if there was an error and the sink isn't
* ignoring errors * ignoring errors
*/ */
@ -687,9 +916,9 @@ private void checkForErrors(String message)
* ({@link #currentFilePath}) and the Throwable's string representation * ({@link #currentFilePath}) and the Throwable's string representation
* appended to it. * appended to it.
* *
* @param message the exception message. The message will have the current * @param message the exception message. The message will have a colon, the
* file name ({@link #currentFilePath}) and the Throwable's string * current file name ({@link #currentFilePath}), and the Throwable's string
* representation appended to it. * representation (wrapped in square brackets) appended to it.
* @param t the Throwable to wrap * @param t the Throwable to wrap
*/ */
private void throwMetricsException(String message, Throwable t) { private void throwMetricsException(String message, Throwable t) {
@ -705,8 +934,8 @@ private void throwMetricsException(String message, Throwable t) {
* new exception's message with the current file name * new exception's message with the current file name
* ({@link #currentFilePath}) appended to it. * ({@link #currentFilePath}) appended to it.
* *
* @param message the exception message. The message will have the current * @param message the exception message. The message will have a colon and
* file name ({@link #currentFilePath}) appended to it. * the current file name ({@link #currentFilePath}) appended to it.
*/ */
private void throwMetricsException(String message) { private void throwMetricsException(String message) {
if (!ignoreError) { if (!ignoreError) {

View File

@ -180,7 +180,9 @@ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
.add(prefix + ".sink.mysink0.source", "testsrc") .add(prefix + ".sink.mysink0.source", "testsrc")
.add(prefix + ".sink.mysink0.context", "test1") .add(prefix + ".sink.mysink0.context", "test1")
.add(prefix + ".sink.mysink0.ignore-error", ignoreErrors) .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
.add(prefix + ".sink.mysink0.allow-append", allowAppend); .add(prefix + ".sink.mysink0.allow-append", allowAppend)
.add(prefix + ".sink.mysink0.roll-offset-interval-millis", 0)
.add(prefix + ".sink.mysink0.roll-interval", "1h");
if (useSecureParams) { if (useSecureParams) {
builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY) builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY)
@ -210,7 +212,7 @@ protected MetricsSystem initMetricsSystem(String path, boolean ignoreErrors,
*/ */
protected String doWriteTest(MetricsSystem ms, String path, int count) protected String doWriteTest(MetricsSystem ms, String path, int count)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
final String then = DATE_FORMAT.format(new Date()); final String then = DATE_FORMAT.format(new Date()) + "00";
MyMetrics1 mm1 = new MyMetrics1().registerWith(ms); MyMetrics1 mm1 = new MyMetrics1().registerWith(ms);
new MyMetrics2().registerWith(ms); new MyMetrics2().registerWith(ms);
@ -239,7 +241,7 @@ protected String doWriteTest(MetricsSystem ms, String path, int count)
*/ */
protected String readLogFile(String path, String then, int count) protected String readLogFile(String path, String then, int count)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
final String now = DATE_FORMAT.format(new Date()); final String now = DATE_FORMAT.format(new Date()) + "00";
final String logFile = getLogFilename(); final String logFile = getLogFilename();
FileSystem fs = FileSystem.get(new URI(path), new Configuration()); FileSystem fs = FileSystem.get(new URI(path), new Configuration());
StringBuilder metrics = new StringBuilder(); StringBuilder metrics = new StringBuilder();
@ -426,7 +428,7 @@ protected void preCreateLogFile(String path, int numFiles)
Calendar now = getNowNotTopOfHour(); Calendar now = getNowNotTopOfHour();
FileSystem fs = FileSystem.get(new URI(path), new Configuration()); FileSystem fs = FileSystem.get(new URI(path), new Configuration());
Path dir = new Path(path, DATE_FORMAT.format(now.getTime())); Path dir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
fs.mkdirs(dir); fs.mkdirs(dir);
@ -494,8 +496,8 @@ public void assertFileCount(FileSystem fs, Path dir, int expected)
} }
assertTrue("The sink created additional unexpected log files. " + count assertTrue("The sink created additional unexpected log files. " + count
+ "files were created", expected >= count); + " files were created", expected >= count);
assertTrue("The sink created too few log files. " + count + "files were " assertTrue("The sink created too few log files. " + count + " files were "
+ "created", expected <= count); + "created", expected <= count);
} }

View File

@ -18,141 +18,244 @@
package org.apache.hadoop.metrics2.sink; package org.apache.hadoop.metrics2.sink;
import org.apache.hadoop.metrics2.MetricsSystem; import java.util.Calendar;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* Test the {@link RollingFileSystemSink} class in the context of the local file * Test that the init() method picks up all the configuration settings
* system. * correctly.
*/
public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
/**
* Test writing logs to the local file system.
* @throws Exception when things break
*/ */
public class TestRollingFileSystemSink {
@Test @Test
public void testWrite() throws Exception { public void testInit() {
String path = methodDir.getAbsolutePath(); ConfigBuilder builder = new ConfigBuilder();
MetricsSystem ms = initMetricsSystem(path, false, false); SubsetConfiguration conf =
builder.add("sink.roll-interval", "10m")
.add("sink.roll-offset-interval-millis", "1")
.add("sink.basepath", "path")
.add("sink.ignore-error", "true")
.add("sink.allow-append", "true")
.add("sink.source", "src")
.subset("sink");
assertMetricsContents(doWriteTest(ms, path, 1)); RollingFileSystemSink sink = new RollingFileSystemSink();
sink.init(conf);
assertEquals("The roll interval was not set correctly",
sink.rollIntervalMillis, 600000);
assertEquals("The roll offset interval was not set correctly",
sink.rollOffsetIntervalMillis, 1);
assertEquals("The base path was not set correctly",
sink.basePath, new Path("path"));
assertEquals("ignore-error was not set correctly",
sink.ignoreError, true);
assertEquals("allow-append was not set correctly",
sink.allowAppend, true);
assertEquals("The source was not set correctly",
sink.source, "src");
} }
/** /**
* Test writing logs to the local file system with the sink set to ignore * Test whether the initial roll interval is set correctly.
* errors.
* @throws Exception when things break
*/ */
@Test @Test
public void testSilentWrite() throws Exception { public void testSetInitialFlushTime() {
String path = methodDir.getAbsolutePath(); RollingFileSystemSink rfsSink = new RollingFileSystemSink(1000, 0);
MetricsSystem ms = initMetricsSystem(path, true, false); Calendar calendar = Calendar.getInstance();
assertMetricsContents(doWriteTest(ms, path, 1)); calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.HOUR, 0);
calendar.set(Calendar.DAY_OF_YEAR, 1);
calendar.set(Calendar.YEAR, 2016);
assertNull("Last flush time should have been null prior to calling init()",
rfsSink.nextFlush);
rfsSink.setInitialFlushTime(calendar.getTime());
long diff =
rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertEquals("The initial flush time was calculated incorrectly", 0L, diff);
calendar.set(Calendar.MILLISECOND, 10);
rfsSink.setInitialFlushTime(calendar.getTime());
diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertEquals("The initial flush time was calculated incorrectly",
-10L, diff);
calendar.set(Calendar.SECOND, 1);
calendar.set(Calendar.MILLISECOND, 10);
rfsSink.setInitialFlushTime(calendar.getTime());
diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertEquals("The initial flush time was calculated incorrectly",
-10L, diff);
// Try again with a random offset
rfsSink = new RollingFileSystemSink(1000, 100);
assertNull("Last flush time should have been null prior to calling init()",
rfsSink.nextFlush);
calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.SECOND, 0);
rfsSink.setInitialFlushTime(calendar.getTime());
diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertTrue("The initial flush time was calculated incorrectly: " + diff,
(diff >= -1000L) && (diff < -900L));
calendar.set(Calendar.MILLISECOND, 10);
rfsSink.setInitialFlushTime(calendar.getTime());
diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertTrue("The initial flush time was calculated incorrectly: " + diff,
((diff >= -10L) && (diff <= 0L) ||
((diff > -1000L) && (diff < -910L))));
calendar.set(Calendar.SECOND, 1);
calendar.set(Calendar.MILLISECOND, 10);
rfsSink.setInitialFlushTime(calendar.getTime());
diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertTrue("The initial flush time was calculated incorrectly: " + diff,
((diff >= -10L) && (diff <= 0L) ||
((diff > -1000L) && (diff < -910L))));
// Now try pathological settings
rfsSink = new RollingFileSystemSink(1000, 1000000);
assertNull("Last flush time should have been null prior to calling init()",
rfsSink.nextFlush);
calendar.set(Calendar.MILLISECOND, 1);
calendar.set(Calendar.SECOND, 0);
rfsSink.setInitialFlushTime(calendar.getTime());
diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
assertTrue("The initial flush time was calculated incorrectly: " + diff,
(diff > -1000L) && (diff <= 0L));
} }
/** /**
* Test writing logs to HDFS when the log file already exists. * Test that the roll time updates correctly.
*
* @throws Exception when things break
*/ */
@Test @Test
public void testExistingWrite() throws Exception { public void testUpdateRollTime() {
String path = methodDir.getAbsolutePath(); RollingFileSystemSink rfsSink = new RollingFileSystemSink(1000, 0);
Calendar calendar = Calendar.getInstance();
assertMetricsContents(doAppendTest(path, false, false, 2)); calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.HOUR, 0);
calendar.set(Calendar.DAY_OF_YEAR, 1);
calendar.set(Calendar.YEAR, 2016);
rfsSink.nextFlush = Calendar.getInstance();
rfsSink.nextFlush.setTime(calendar.getTime());
rfsSink.updateFlushTime(calendar.getTime());
assertEquals("The next roll time should have been 1 second in the future",
calendar.getTimeInMillis() + 1000,
rfsSink.nextFlush.getTimeInMillis());
rfsSink.nextFlush.setTime(calendar.getTime());
calendar.add(Calendar.MILLISECOND, 10);
rfsSink.updateFlushTime(calendar.getTime());
assertEquals("The next roll time should have been 990 ms in the future",
calendar.getTimeInMillis() + 990,
rfsSink.nextFlush.getTimeInMillis());
rfsSink.nextFlush.setTime(calendar.getTime());
calendar.add(Calendar.SECOND, 2);
calendar.add(Calendar.MILLISECOND, 10);
rfsSink.updateFlushTime(calendar.getTime());
assertEquals("The next roll time should have been 990 ms in the future",
calendar.getTimeInMillis() + 990,
rfsSink.nextFlush.getTimeInMillis());
} }
/** /**
* Test writing logs to HDFS when the log file and the .1 log file already * Test whether the roll interval is correctly calculated from the
* exist. * configuration settings.
*
* @throws Exception when things break
*/ */
@Test @Test
public void testExistingWrite2() throws Exception { public void testGetRollInterval() {
String path = methodDir.getAbsolutePath(); doTestGetRollInterval(1, new String[] {"m", "min", "minute", "minutes"},
MetricsSystem ms = initMetricsSystem(path, false, false); 60 * 1000L);
doTestGetRollInterval(1, new String[] {"h", "hr", "hour", "hours"},
60 * 60 * 1000L);
doTestGetRollInterval(1, new String[] {"d", "day", "days"},
24 * 60 * 60 * 1000L);
preCreateLogFile(path, 2); ConfigBuilder builder = new ConfigBuilder();
SubsetConfiguration conf =
builder.add("sink.roll-interval", "1").subset("sink");
// We can reuse the same sink evry time because we're setting the same
// property every time.
RollingFileSystemSink sink = new RollingFileSystemSink();
assertMetricsContents(doWriteTest(ms, path, 3)); sink.init(conf);
}
/** assertEquals(3600000L, sink.getRollInterval());
* Test writing logs to HDFS with ignore errors enabled when
* the log file already exists.
*
* @throws Exception when things break
*/
@Test
public void testSilentExistingWrite() throws Exception {
String path = methodDir.getAbsolutePath();
assertMetricsContents(doAppendTest(path, false, false, 2)); for (char c : "abcefgijklnopqrtuvwxyz".toCharArray()) {
} builder = new ConfigBuilder();
conf = builder.add("sink.roll-interval", "90 " + c).subset("sink");
/**
* Test that writing fails when the directory isn't writable.
*/
@Test
public void testFailedWrite() {
String path = methodDir.getAbsolutePath();
MetricsSystem ms = initMetricsSystem(path, false, false);
new MyMetrics1().registerWith(ms);
methodDir.setWritable(false);
MockSink.errored = false;
try { try {
// publish the metrics sink.init(conf);
ms.publishMetricsNow(); sink.getRollInterval();
fail("Allowed flush interval with bad units: " + c);
assertTrue("No exception was generated while writing metrics " } catch (MetricsException ex) {
+ "even though the target directory was not writable", // Expected
MockSink.errored); }
ms.stop();
ms.shutdown();
} finally {
// Make sure the dir is writable again so we can delete it at the end
methodDir.setWritable(true);
} }
} }
/** /**
* Test that writing fails silently when the directory is not writable. * Test the basic unit conversions with the given unit name modifier applied.
*
* @param mod a unit name modifier
*/ */
@Test private void doTestGetRollInterval(int num, String[] units, long expected) {
public void testSilentFailedWrite() { RollingFileSystemSink sink = new RollingFileSystemSink();
String path = methodDir.getAbsolutePath(); ConfigBuilder builder = new ConfigBuilder();
MetricsSystem ms = initMetricsSystem(path, true, false);
new MyMetrics1().registerWith(ms); for (String unit : units) {
sink.init(builder.add("sink.roll-interval", num + unit).subset("sink"));
assertEquals(expected, sink.getRollInterval());
methodDir.setWritable(false); sink.init(builder.add("sink.roll-interval",
MockSink.errored = false; num + unit.toUpperCase()).subset("sink"));
assertEquals(expected, sink.getRollInterval());
try { sink.init(builder.add("sink.roll-interval",
// publish the metrics num + " " + unit).subset("sink"));
ms.publishMetricsNow(); assertEquals(expected, sink.getRollInterval());
assertFalse("An exception was generated while writing metrics " sink.init(builder.add("sink.roll-interval",
+ "when the target directory was not writable, even though the " num + " " + unit.toUpperCase()).subset("sink"));
+ "sink is set to ignore errors", assertEquals(expected, sink.getRollInterval());
MockSink.errored);
ms.stop();
ms.shutdown();
} finally {
// Make sure the dir is writable again so we can delete it at the end
methodDir.setWritable(true);
} }
} }
} }

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test the {@link RollingFileSystemSink} class in the context of the local file
* system.
*/
public class TestRollingFileSystemSinkWithLocal
extends RollingFileSystemSinkTestBase {
/**
* Test writing logs to the local file system.
* @throws Exception when things break
*/
@Test
public void testWrite() throws Exception {
String path = methodDir.getAbsolutePath();
MetricsSystem ms = initMetricsSystem(path, false, false);
assertMetricsContents(doWriteTest(ms, path, 1));
}
/**
* Test writing logs to the local file system with the sink set to ignore
* errors.
* @throws Exception when things break
*/
@Test
public void testSilentWrite() throws Exception {
String path = methodDir.getAbsolutePath();
MetricsSystem ms = initMetricsSystem(path, true, false);
assertMetricsContents(doWriteTest(ms, path, 1));
}
/**
* Test writing logs to HDFS when the log file already exists.
*
* @throws Exception when things break
*/
@Test
public void testExistingWrite() throws Exception {
String path = methodDir.getAbsolutePath();
assertMetricsContents(doAppendTest(path, false, false, 2));
}
/**
* Test writing logs to HDFS when the log file and the .1 log file already
* exist.
*
* @throws Exception when things break
*/
@Test
public void testExistingWrite2() throws Exception {
String path = methodDir.getAbsolutePath();
MetricsSystem ms = initMetricsSystem(path, false, false);
preCreateLogFile(path, 2);
assertMetricsContents(doWriteTest(ms, path, 3));
}
/**
* Test writing logs to HDFS with ignore errors enabled when
* the log file already exists.
*
* @throws Exception when things break
*/
@Test
public void testSilentExistingWrite() throws Exception {
String path = methodDir.getAbsolutePath();
assertMetricsContents(doAppendTest(path, false, false, 2));
}
/**
* Test that writing fails when the directory isn't writable.
*/
@Test
public void testFailedWrite() {
String path = methodDir.getAbsolutePath();
MetricsSystem ms = initMetricsSystem(path, false, false);
new MyMetrics1().registerWith(ms);
methodDir.setWritable(false);
MockSink.errored = false;
try {
// publish the metrics
ms.publishMetricsNow();
assertTrue("No exception was generated while writing metrics "
+ "even though the target directory was not writable",
MockSink.errored);
ms.stop();
ms.shutdown();
} finally {
// Make sure the dir is writable again so we can delete it at the end
methodDir.setWritable(true);
}
}
/**
* Test that writing fails silently when the directory is not writable.
*/
@Test
public void testSilentFailedWrite() {
String path = methodDir.getAbsolutePath();
MetricsSystem ms = initMetricsSystem(path, true, false);
new MyMetrics1().registerWith(ms);
methodDir.setWritable(false);
MockSink.errored = false;
try {
// publish the metrics
ms.publishMetricsNow();
assertFalse("An exception was generated while writing metrics "
+ "when the target directory was not writable, even though the "
+ "sink is set to ignore errors",
MockSink.errored);
ms.stop();
ms.shutdown();
} finally {
// Make sure the dir is writable again so we can delete it at the end
methodDir.setWritable(true);
}
}
}

View File

@ -20,8 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.junit.After;
import org.junit.Before;
import java.util.Calendar; import java.util.Calendar;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -30,9 +28,12 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1; import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* Test the {@link RollingFileSystemSink} class in the context of HDFS. * Test the {@link RollingFileSystemSink} class in the context of HDFS.
@ -58,7 +59,6 @@ public void setupHdfs() throws IOException {
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
// Also clear sink flags // Also clear sink flags
RollingFileSystemSink.flushQuickly = false;
RollingFileSystemSink.hasFlushed = false; RollingFileSystemSink.hasFlushed = false;
} }
@ -251,10 +251,12 @@ public void testSilentFailedClose() throws IOException {
*/ */
@Test @Test
public void testFlushThread() throws Exception { public void testFlushThread() throws Exception {
RollingFileSystemSink.flushQuickly = true; // Cause the sink's flush thread to be run immediately after the second
// metrics log is written
RollingFileSystemSink.forceFlush = true;
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp"; String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
MetricsSystem ms = initMetricsSystem(path, true, false); MetricsSystem ms = initMetricsSystem(path, true, false, false);
new MyMetrics1().registerWith(ms); new MyMetrics1().registerWith(ms);
@ -264,14 +266,21 @@ public void testFlushThread() throws Exception {
// regardless. // regardless.
ms.publishMetricsNow(); ms.publishMetricsNow();
// Sleep until the flusher has run int count = 0;
// Sleep until the flusher has run. This should never actually need to
// sleep, but the sleep is here to make sure this test isn't flakey.
while (!RollingFileSystemSink.hasFlushed) { while (!RollingFileSystemSink.hasFlushed) {
Thread.sleep(50L); Thread.sleep(10L);
if (++count > 1000) {
fail("Flush thread did not run within 10 seconds");
}
} }
Calendar now = getNowNotTopOfHour(); Calendar now = Calendar.getInstance();
Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration()); FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()));
Path currentFile = Path currentFile =
findMostRecentLogFile(fs, new Path(currentDir, getLogFilename())); findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
FileStatus status = fs.getFileStatus(currentFile); FileStatus status = fs.getFileStatus(currentFile);

View File

@ -49,8 +49,11 @@
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertTrue; import org.junit.Before;
import org.junit.BeforeClass;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
/** /**
@ -66,6 +69,75 @@ public class TestRollingFileSystemSinkWithSecureHdfs
private static String hdfsPrincipal; private static String hdfsPrincipal;
private static String hdfsKeytab; private static String hdfsKeytab;
private static String spnegoPrincipal; private static String spnegoPrincipal;
private MiniDFSCluster cluster = null;
private UserGroupInformation sink = null;
/**
* Setup the KDC for testing a secure HDFS cluster.
*
* @throws Exception thrown if the KDC setup fails
*/
@BeforeClass
public static void initKdc() throws Exception {
Properties kdcConf = MiniKdc.createConf();
kdc = new MiniKdc(kdcConf, ROOT_TEST_DIR);
kdc.start();
File sinkKeytabFile = new File(ROOT_TEST_DIR, "sink.keytab");
sinkKeytab = sinkKeytabFile.getAbsolutePath();
kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
sinkPrincipal = "sink/localhost@" + kdc.getRealm();
File hdfsKeytabFile = new File(ROOT_TEST_DIR, "hdfs.keytab");
hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
"HTTP/localhost");
hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
}
/**
* Setup the mini-DFS cluster.
*
* @throws Exception thrown if the cluster setup fails
*/
@Before
public void initCluster() throws Exception {
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
RollingFileSystemSink.hasFlushed = false;
RollingFileSystemSink.suppliedConf = conf;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
cluster.waitActive();
createDirectoriesSecurely();
}
/**
* Stop the mini-DFS cluster.
*/
@After
public void stopCluster() {
if (cluster != null) {
cluster.shutdown();
}
// Restore non-secure conf
UserGroupInformation.setConfiguration(new Configuration());
RollingFileSystemSink.suppliedConf = null;
RollingFileSystemSink.suppliedFilesystem = null;
}
/**
* Stop the mini-KDC.
*/
@AfterClass
public static void shutdownKdc() {
if (kdc != null) {
kdc.stop();
}
}
/** /**
* Do a basic write test against an HDFS cluster with Kerberos enabled. We * Do a basic write test against an HDFS cluster with Kerberos enabled. We
@ -76,26 +148,10 @@ public class TestRollingFileSystemSinkWithSecureHdfs
*/ */
@Test @Test
public void testWithSecureHDFS() throws Exception { public void testWithSecureHDFS() throws Exception {
RollingFileSystemSink.flushQuickly = false;
RollingFileSystemSink.hasFlushed = false;
initKdc();
MiniDFSCluster cluster = null;
try {
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
RollingFileSystemSink.suppliedConf = conf;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
cluster.waitActive();
UserGroupInformation sink = createDirectoriesSecurely(cluster);
final String path = final String path =
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test"; "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
final MetricsSystem ms = initMetricsSystem(path, true, false, true); final MetricsSystem ms =
initMetricsSystem(path, true, false, true);
assertMetricsContents( assertMetricsContents(
sink.doAs(new PrivilegedExceptionAction<String>() { sink.doAs(new PrivilegedExceptionAction<String>() {
@ -104,18 +160,6 @@ public String run() throws Exception {
return doWriteTest(ms, path, 1); return doWriteTest(ms, path, 1);
} }
})); }));
} finally {
if (cluster != null) {
cluster.shutdown();
}
shutdownKdc();
// Restore non-secure conf
UserGroupInformation.setConfiguration(new Configuration());
RollingFileSystemSink.suppliedConf = null;
RollingFileSystemSink.suppliedFilesystem = null;
}
} }
/** /**
@ -126,54 +170,25 @@ public String run() throws Exception {
*/ */
@Test @Test
public void testMissingPropertiesWithSecureHDFS() throws Exception { public void testMissingPropertiesWithSecureHDFS() throws Exception {
RollingFileSystemSink.flushQuickly = false;
RollingFileSystemSink.hasFlushed = false;
initKdc();
MiniDFSCluster cluster = null;
try {
HdfsConfiguration conf = createSecureConfig("authentication,privacy");
RollingFileSystemSink.suppliedConf = conf;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
final String path = final String path =
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test"; "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
createDirectoriesSecurely(cluster);
initMetricsSystem(path, true, false); initMetricsSystem(path, true, false);
assertTrue("No exception was generated initializing the sink against a " assertTrue("No exception was generated initializing the sink against a "
+ "secure cluster even though the principal and keytab properties " + "secure cluster even though the principal and keytab properties "
+ "were missing", MockSink.errored); + "were missing", MockSink.errored);
} finally {
if (cluster != null) {
cluster.shutdown();
}
shutdownKdc();
// Restore non-secure conf
UserGroupInformation.setConfiguration(new Configuration());
RollingFileSystemSink.suppliedConf = null;
RollingFileSystemSink.suppliedFilesystem = null;
}
} }
/** /**
* Create the /tmp directory as <i>hdfs</i> and /tmp/test as <i>sink</i> and * Create the /tmp directory as <i>hdfs</i> and /tmp/test as <i>sink</i> and
* return the UGI for <i>sink</i>. * return the UGI for <i>sink</i>.
* *
* @param cluster the mini-cluster
* @return the UGI for <i>sink</i>
* @throws IOException thrown if login or directory creation fails * @throws IOException thrown if login or directory creation fails
* @throws InterruptedException thrown if interrupted while creating a * @throws InterruptedException thrown if interrupted while creating a
* file system handle * file system handle
*/ */
protected UserGroupInformation createDirectoriesSecurely(final MiniDFSCluster cluster) protected void createDirectoriesSecurely()
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path tmp = new Path("/tmp"); Path tmp = new Path("/tmp");
Path test = new Path(tmp, "test"); Path test = new Path(tmp, "test");
@ -192,9 +207,9 @@ public FileSystem run() throws Exception {
fsForSuperUser.mkdirs(tmp); fsForSuperUser.mkdirs(tmp);
fsForSuperUser.setPermission(tmp, new FsPermission((short)0777)); fsForSuperUser.setPermission(tmp, new FsPermission((short)0777));
UserGroupInformation sink = sink = UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
sinkKeytab); sinkKeytab);
FileSystem fsForSink = FileSystem fsForSink =
sink.doAs(new PrivilegedExceptionAction<FileSystem>() { sink.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override @Override
@ -205,40 +220,6 @@ public FileSystem run() throws Exception {
fsForSink.mkdirs(test); fsForSink.mkdirs(test);
RollingFileSystemSink.suppliedFilesystem = fsForSink; RollingFileSystemSink.suppliedFilesystem = fsForSink;
return sink;
}
/**
* Setup the KDC for testing a secure HDFS cluster
*
* @throws Exception thrown if the KDC setup fails
*/
public static void initKdc() throws Exception {
Properties kdcConf = MiniKdc.createConf();
kdc = new MiniKdc(kdcConf, methodDir);
kdc.start();
File sinkKeytabFile = new File(methodDir, "sink.keytab");
sinkKeytab = sinkKeytabFile.getAbsolutePath();
kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
sinkPrincipal = "sink/localhost@" + kdc.getRealm();
File hdfsKeytabFile = new File(methodDir, "hdfs.keytab");
hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
"HTTP/localhost");
hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
}
/**
* Stop the mini-KDC.
*/
public static void shutdownKdc() {
if (kdc != null) {
kdc.stop();
}
} }
/** /**