From 7b1ac5ac13f5af483065788e59807fe08d2299f3 Mon Sep 17 00:00:00 2001 From: Hairong Kuang Date: Thu, 1 Apr 2010 20:58:10 +0000 Subject: [PATCH] HADOOP-6640. FileSystem.get() does RPC retries within a static synchronized block. Contributed by Hairong Kuang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@930096 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 ++ src/java/org/apache/hadoop/fs/FileSystem.java | 27 ++++++++--- .../hadoop/fs/TestFileSystemCaching.java | 45 +++++++++++++++++++ 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e761d82c746..9152ae818f5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -306,6 +306,9 @@ Trunk (unreleased changes) HADOOP-6654. Fix code example in WritableComparable javadoc. (Tom White via szetszwo) + HADOOP-6640. FileSystem.get() does RPC retries within a static + synchronized block. (hairong) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/fs/FileSystem.java b/src/java/org/apache/hadoop/fs/FileSystem.java index be1cdc88306..95ff7bf4769 100644 --- a/src/java/org/apache/hadoop/fs/FileSystem.java +++ b/src/java/org/apache/hadoop/fs/FileSystem.java @@ -1758,32 +1758,45 @@ public abstract class FileSystem extends Configured implements Closeable { /** A variable that makes all objects in the cache unique */ private static AtomicLong unique = new AtomicLong(1); - synchronized FileSystem get(URI uri, Configuration conf) throws IOException{ + FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); return getInternal(uri, conf, key); } /** The objects inserted into the cache using this method are all unique */ - synchronized FileSystem getUnique(URI uri, Configuration conf) throws IOException{ + FileSystem getUnique(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf, unique.getAndIncrement()); return getInternal(uri, conf, key); } private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ - FileSystem fs = map.get(key); - if (fs == null) { - fs = createFileSystem(uri, conf); + FileSystem fs; + synchronized (this) { + fs = map.get(key); + } + if (fs != null) { + return fs; + } + + fs = createFileSystem(uri, conf); + synchronized (this) { // refetch the lock again + FileSystem oldfs = map.get(key); + if (oldfs != null) { // a file system is created while lock is releasing + fs.close(); // close the new file system + return oldfs; // return the old file system + } + + // now insert the new file system into the map if (map.isEmpty() && !clientFinalizer.isAlive()) { Runtime.getRuntime().addShutdownHook(clientFinalizer); } fs.key = key; map.put(key, fs); - if (conf.getBoolean("fs.automatic.close", true)) { toAutoClose.add(key); } + return fs; } - return fs; } synchronized void remove(Key key, FileSystem fs) { diff --git a/src/test/core/org/apache/hadoop/fs/TestFileSystemCaching.java b/src/test/core/org/apache/hadoop/fs/TestFileSystemCaching.java index 8caeeb134a8..1fb66f583a9 100644 --- a/src/test/core/org/apache/hadoop/fs/TestFileSystemCaching.java +++ b/src/test/core/org/apache/hadoop/fs/TestFileSystemCaching.java @@ -21,7 +21,9 @@ package org.apache.hadoop.fs; import static junit.framework.Assert.assertSame; import static junit.framework.Assert.assertNotSame; +import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -46,6 +48,49 @@ public class TestFileSystemCaching { assertSame(fs1, fs2); } + public static class InitializeForeverFileSystem extends LocalFileSystem { + public void initialize(URI uri, Configuration conf) throws IOException { + // notify that InitializeForeverFileSystem started initialization + synchronized (conf) { + conf.notify(); + } + try { + while (true) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + return; + } + } + } + + @Test + public void testCacheEnabledWithInitializeForeverFS() throws Exception { + final Configuration conf = new Configuration(); + Thread t = new Thread() { + public void run() { + conf.set("fs.localfs1.impl", "org.apache.hadoop.fs." + + "TestFileSystemCaching$InitializeForeverFileSystem"); + try { + FileSystem.get(new URI("localfs1://a"), conf); + } catch (IOException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + }; + t.start(); + // wait for InitializeForeverFileSystem to start initialization + synchronized (conf) { + conf.wait(); + } + conf.set("fs.cachedfile.impl", conf.get("fs.file.impl")); + FileSystem.get(new URI("cachedfile://a"), conf); + t.interrupt(); + t.join(); + } + @Test public void testCacheDisabled() throws Exception { Configuration conf = new Configuration();