HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File System. Contributed by Shanyu Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611247 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit 0a02b5a19b
)
Conflicts:
hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
parent
b928b8c775
commit
02a3bf8bda
|
@ -68,6 +68,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
HADOOP-11416. Move ChunkedArrayList into hadoop-common (cmccabe)
|
||||
|
||||
HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File
|
||||
System. (Shanyu Zhao via cnauroth)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
|
||||
|
|
|
@ -373,6 +373,8 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
private Path workingDir;
|
||||
private long blockSize = MAX_AZURE_BLOCK_SIZE;
|
||||
private AzureFileSystemInstrumentation instrumentation;
|
||||
private String metricsSourceName;
|
||||
private boolean isClosed = false;
|
||||
private static boolean suppressRetryPolicy = false;
|
||||
// A counter to create unique (within-process) names for my metrics sources.
|
||||
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
|
||||
|
@ -482,11 +484,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
// Make sure the metrics system is available before interacting with Azure
|
||||
AzureFileSystemMetricsSystem.fileSystemStarted();
|
||||
String sourceName = newMetricsSourceName(),
|
||||
sourceDesc = "Azure Storage Volume File System metrics";
|
||||
instrumentation = DefaultMetricsSystem.instance().register(sourceName,
|
||||
sourceDesc, new AzureFileSystemInstrumentation(conf));
|
||||
AzureFileSystemMetricsSystem.registerSource(sourceName, sourceDesc,
|
||||
metricsSourceName = newMetricsSourceName();
|
||||
String sourceDesc = "Azure Storage Volume File System metrics";
|
||||
instrumentation = new AzureFileSystemInstrumentation(conf);
|
||||
AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
|
||||
instrumentation);
|
||||
|
||||
store.initialize(uri, conf, instrumentation);
|
||||
|
@ -502,7 +503,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
LOG.debug(" blockSize = "
|
||||
+ conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private NativeFileSystemStore createDefaultStore(Configuration conf) {
|
||||
|
@ -1337,7 +1337,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public synchronized void close() throws IOException {
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Call the base close() to close any resources there.
|
||||
super.close();
|
||||
// Close the store
|
||||
|
@ -1349,12 +1353,14 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
|
||||
AzureFileSystemMetricsSystem.fileSystemClosed();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Submitting metrics when file system closed took "
|
||||
+ (System.currentTimeMillis() - startTime) + " ms.");
|
||||
}
|
||||
isClosed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1498,6 +1504,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
LOG.debug("finalize() called.");
|
||||
close();
|
||||
super.finalize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode the key with a random prefix for load balancing in Azure storage.
|
||||
* Upload data to a random temporary file then do storage side renaming to
|
||||
|
|
|
@ -44,21 +44,26 @@ public final class AzureFileSystemMetricsSystem {
|
|||
}
|
||||
|
||||
public static synchronized void fileSystemClosed() {
|
||||
if (instance != null) {
|
||||
instance.publishMetricsNow();
|
||||
}
|
||||
if (numFileSystems == 1) {
|
||||
instance.publishMetricsNow();
|
||||
instance.stop();
|
||||
instance.shutdown();
|
||||
instance = null;
|
||||
}
|
||||
numFileSystems--;
|
||||
}
|
||||
|
||||
|
||||
public static void registerSource(String name, String desc,
|
||||
MetricsSource source) {
|
||||
// Register the source with the name appended with -WasbSystem
|
||||
// so that the name is globally unique.
|
||||
instance.register(name + "-WasbSystem", desc, source);
|
||||
//caller has to use unique name to register source
|
||||
instance.register(name, desc, source);
|
||||
}
|
||||
|
||||
public static synchronized void unregisterSource(String name) {
|
||||
if (instance != null) {
|
||||
//publish metrics before unregister a metrics source
|
||||
instance.publishMetricsNow();
|
||||
instance.unregisterSource(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -324,9 +324,7 @@ public final class AzureBlobStorageTestAccount {
|
|||
String sourceName = NativeAzureFileSystem.newMetricsSourceName();
|
||||
String sourceDesc = "Azure Storage Volume File System metrics";
|
||||
|
||||
AzureFileSystemInstrumentation instrumentation =
|
||||
DefaultMetricsSystem.instance().register(sourceName,
|
||||
sourceDesc, new AzureFileSystemInstrumentation(conf));
|
||||
AzureFileSystemInstrumentation instrumentation = new AzureFileSystemInstrumentation(conf);
|
||||
|
||||
AzureFileSystemMetricsSystem.registerSource(
|
||||
sourceName, sourceDesc, instrumentation);
|
||||
|
|
|
@ -516,6 +516,13 @@ public abstract class NativeAzureFileSystemBaseTest {
|
|||
assertNotNull(status);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseFileSystemTwice() throws Exception {
|
||||
//make sure close() can be called multiple times without doing any harm
|
||||
fs.close();
|
||||
fs.close();
|
||||
}
|
||||
|
||||
private boolean testModifiedTime(Path testPath, long time) throws Exception {
|
||||
FileStatus fileStatus = fs.getFileStatus(testPath);
|
||||
final long errorMargin = modifiedTimeErrorMargin;
|
||||
|
|
Loading…
Reference in New Issue