HADOOP-10496. Merging change r1587141 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1587144 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
065a09c8fc
commit
e390be170f
|
@ -60,6 +60,8 @@ Release 2.5.0 - UNRELEASED
|
||||||
HADOOP-10495. TestFileUtil fails on Windows due to bad permission
|
HADOOP-10495. TestFileUtil fails on Windows due to bad permission
|
||||||
assertions. (cnauroth)
|
assertions. (cnauroth)
|
||||||
|
|
||||||
|
HADOOP-10496. Metrics system FileSink can leak file descriptor. (cnauroth)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,11 +18,19 @@
|
||||||
|
|
||||||
package org.apache.hadoop.metrics2;
|
package org.apache.hadoop.metrics2;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The metrics sink interface
|
* The metrics sink interface. <p>
|
||||||
|
* Implementations of this interface consume the {@link MetricsRecord} generated
|
||||||
|
* from {@link MetricsSource}. It registers with {@link MetricsSystem} which
|
||||||
|
* periodically pushes the {@link MetricsRecord} to the sink using
|
||||||
|
* {@link #putMetrics(MetricsRecord)} method. If the implementing class also
|
||||||
|
* implements {@link Closeable}, then the MetricsSystem will close the sink when
|
||||||
|
* it is stopped.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.metrics2.impl;
|
package org.apache.hadoop.metrics2.impl;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
@ -25,6 +26,7 @@ import static com.google.common.base.Preconditions.*;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||||
|
@ -202,6 +204,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
|
||||||
catch (InterruptedException e) {
|
catch (InterruptedException e) {
|
||||||
LOG.warn("Stop interrupted", e);
|
LOG.warn("Stop interrupted", e);
|
||||||
}
|
}
|
||||||
|
if (sink instanceof Closeable) {
|
||||||
|
IOUtils.cleanup(LOG, (Closeable)sink);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String name() {
|
String name() {
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.metrics2.sink;
|
package org.apache.hadoop.metrics2.sink;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
|
||||||
import org.apache.commons.configuration.SubsetConfiguration;
|
import org.apache.commons.configuration.SubsetConfiguration;
|
||||||
|
@ -36,7 +38,7 @@ import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class FileSink implements MetricsSink {
|
public class FileSink implements MetricsSink, Closeable {
|
||||||
private static final String FILENAME_KEY = "filename";
|
private static final String FILENAME_KEY = "filename";
|
||||||
private PrintWriter writer;
|
private PrintWriter writer;
|
||||||
|
|
||||||
|
@ -82,4 +84,9 @@ public class FileSink implements MetricsSink {
|
||||||
public void flush() {
|
public void flush() {
|
||||||
writer.flush();
|
writer.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,10 +106,17 @@ public class TestFileSink {
|
||||||
ms.stop();
|
ms.stop();
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
|
|
||||||
InputStream is = new FileInputStream(outFile);
|
InputStream is = null;
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream((int)outFile.length());
|
ByteArrayOutputStream baos = null;
|
||||||
|
String outFileContent = null;
|
||||||
|
try {
|
||||||
|
is = new FileInputStream(outFile);
|
||||||
|
baos = new ByteArrayOutputStream((int)outFile.length());
|
||||||
IOUtils.copyBytes(is, baos, 1024, true);
|
IOUtils.copyBytes(is, baos, 1024, true);
|
||||||
String outFileContent = new String(baos.toByteArray(), "UTF-8");
|
outFileContent = new String(baos.toByteArray(), "UTF-8");
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(null, baos, is);
|
||||||
|
}
|
||||||
|
|
||||||
// Check the out file content. Should be something like the following:
|
// Check the out file content. Should be something like the following:
|
||||||
//1360244820087 test1.testRecord1: Context=test1, testTag1=testTagValue1, testTag2=testTagValue2, Hostname=myhost, testMetric1=1, testMetric2=2
|
//1360244820087 test1.testRecord1: Context=test1, testTag1=testTagValue1, testTag2=testTagValue2, Hostname=myhost, testMetric1=1, testMetric2=2
|
||||||
|
|
Loading…
Reference in New Issue