From dba7a7dd9d70adfab36a78eb55059c54e553a5cb Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Tue, 19 Sep 2017 17:45:41 +0800 Subject: [PATCH] HDFS-12479. Some misuses of lock in DFSStripedOutputStream. Contributed by Huafeng Wang --- .../org/apache/hadoop/hdfs/DFSStripedOutputStream.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 44db3a68245..66eec7a360f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -85,11 +86,10 @@ public class DFSStripedOutputStream extends DFSOutputStream private final List> queues; MultipleBlockingQueue(int numQueue, int queueSize) { - List> list = new ArrayList<>(numQueue); + queues = new ArrayList<>(numQueue); for (int i = 0; i < numQueue; i++) { - list.add(new LinkedBlockingQueue(queueSize)); + queues.add(new LinkedBlockingQueue(queueSize)); } - queues = Collections.synchronizedList(list); } void offer(int i, T object) { @@ -156,8 +156,7 @@ public class DFSStripedOutputStream extends DFSOutputStream followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); - updateStreamerMap = Collections.synchronizedMap( - new HashMap(numAllBlocks)); + updateStreamerMap = new ConcurrentHashMap<>(numAllBlocks); streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1); }