diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7950b09078b..3d60e9567ba 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -384,6 +384,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10368. InputStream is not closed in VersionInfo ctor. (Tsuyoshi OZAWA via szetszwo) + HADOOP-10368. FsUrlStreamHandlerFactory is not thread safe. + (Tudor Scurtu via cnauroth) + BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS HADOOP-10185. FileSystem API for ACLs. (cnauroth) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsUrlStreamHandlerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsUrlStreamHandlerFactory.java index 9c212a4a59d..91a527dce7e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsUrlStreamHandlerFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsUrlStreamHandlerFactory.java @@ -19,8 +19,8 @@ package org.apache.hadoop.fs; import java.io.IOException; import java.net.URLStreamHandlerFactory; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -45,7 +45,8 @@ public class FsUrlStreamHandlerFactory implements private Configuration conf; // This map stores whether a protocol is know or not by FileSystem - private Map protocols = new HashMap(); + private Map protocols = + new ConcurrentHashMap(); // The URL Stream handler private java.net.URLStreamHandler handler; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandler.java index 845eb6314ef..0871f6edd9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandler.java @@ -35,7 +35,7 @@ import org.apache.hadoop.test.PathUtils; import org.junit.Test; /** - * Test of the URL stream handler factory. + * Test of the URL stream handler. */ public class TestUrlStreamHandler { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java new file mode 100644 index 00000000000..910fee2b071 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUrlStreamHandlerFactory.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Test of the URL stream handler factory. + */ +public class TestUrlStreamHandlerFactory { + + private static final int RUNS = 20; + private static final int THREADS = 10; + private static final int TASKS = 200; + private static final int TIMEOUT = 30; + + @Test + public void testConcurrency() throws Exception { + for (int i = 0; i < RUNS; i++) { + singleRun(); + } + } + + private void singleRun() throws Exception { + final FsUrlStreamHandlerFactory factory = new FsUrlStreamHandlerFactory(); + final Random random = new Random(); + ExecutorService executor = Executors.newFixedThreadPool(THREADS); + ArrayList> futures = new ArrayList>(TASKS); + + for (int i = 0; i < TASKS ; i++) { + final int aux = i; + futures.add(executor.submit(new Runnable() { + @Override + public void run() { + int rand = aux + random.nextInt(3); + factory.createURLStreamHandler(String.valueOf(rand)); + } + })); + } + + executor.shutdown(); + try { + executor.awaitTermination(TIMEOUT, TimeUnit.SECONDS); + executor.shutdownNow(); + } catch (InterruptedException e) { + // pass + } + + // check for exceptions + for (Future future : futures) { + if (!future.isDone()) { + break; // timed out + } + future.get(); + } + } +}