From 533a2be5ac7c7f0473fdd24d6201582d08964e21 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 23 Oct 2015 13:52:59 -0700 Subject: [PATCH] HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes. (Walter Su via lei) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../datanode/fsdataset/impl/FsVolumeList.java | 109 ++++++------------ 2 files changed, 38 insertions(+), 74 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cf6558fbdfb..066ae02b137 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1560,6 +1560,9 @@ Release 2.8.0 - UNRELEASED HDFS-7087. Ability to list /.reserved. (Xiao Chen via wang) + HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes. + (Walter Su via lei) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index a73e12953d1..608ee291b41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -30,9 +29,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CopyOnWriteArrayList; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -46,8 +44,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; class FsVolumeList { - private final AtomicReference volumes = - new AtomicReference<>(new FsVolumeImpl[0]); + private final CopyOnWriteArrayList volumes = + new CopyOnWriteArrayList<>(); // Tracks volume failures, sorted by volume path. private final Map volumeFailureInfos = Collections.synchronizedMap(new TreeMap()); @@ -71,7 +69,7 @@ class FsVolumeList { * Return an immutable list view of all the volumes. */ List getVolumes() { - return Collections.unmodifiableList(Arrays.asList(volumes.get())); + return Collections.unmodifiableList(volumes); } private FsVolumeReference chooseVolume(List list, long blockSize) @@ -98,10 +96,8 @@ class FsVolumeList { */ FsVolumeReference getNextVolume(StorageType storageType, long blockSize) throws IOException { - // Get a snapshot of currently available volumes. - final FsVolumeImpl[] curVolumes = volumes.get(); - final List list = new ArrayList<>(curVolumes.length); - for(FsVolumeImpl v : curVolumes) { + final List list = new ArrayList<>(volumes.size()); + for(FsVolumeImpl v : volumes) { if (v.getStorageType() == storageType) { list.add(v); } @@ -129,7 +125,7 @@ class FsVolumeList { long getDfsUsed() throws IOException { long dfsUsed = 0L; - for (FsVolumeImpl v : volumes.get()) { + for (FsVolumeImpl v : volumes) { try(FsVolumeReference ref = v.obtainReference()) { dfsUsed += v.getDfsUsed(); } catch (ClosedChannelException e) { @@ -141,7 +137,7 @@ class FsVolumeList { long getBlockPoolUsed(String bpid) throws IOException { long dfsUsed = 0L; - for (FsVolumeImpl v : volumes.get()) { + for (FsVolumeImpl v : volumes) { try (FsVolumeReference ref = v.obtainReference()) { dfsUsed += v.getBlockPoolUsed(bpid); } catch (ClosedChannelException e) { @@ -153,7 +149,7 @@ class FsVolumeList { long getCapacity() { long capacity = 0L; - for (FsVolumeImpl v : volumes.get()) { + for (FsVolumeImpl v : volumes) { try (FsVolumeReference ref = v.obtainReference()) { capacity += v.getCapacity(); } catch (IOException e) { @@ -165,7 +161,7 @@ class FsVolumeList { long getRemaining() throws IOException { long remaining = 0L; - for (FsVolumeSpi vol : volumes.get()) { + for (FsVolumeSpi vol : volumes) { try (FsVolumeReference ref = vol.obtainReference()) { remaining += vol.getAvailable(); } catch (ClosedChannelException e) { @@ -183,7 +179,7 @@ class FsVolumeList { final List exceptions = Collections.synchronizedList( new ArrayList()); List replicaAddingThreads = new ArrayList(); - for (final FsVolumeImpl v : volumes.get()) { + for (final FsVolumeImpl v : volumes) { Thread t = new Thread() { public void run() { try (FsVolumeReference ref = v.obtainReference()) { @@ -267,7 +263,7 @@ class FsVolumeList { @Override public String toString() { - return Arrays.toString(volumes.get()); + return volumes.toString(); } /** @@ -277,21 +273,7 @@ class FsVolumeList { */ void addVolume(FsVolumeReference ref) { FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume(); - while (true) { - final FsVolumeImpl[] curVolumes = volumes.get(); - final List volumeList = Lists.newArrayList(curVolumes); - volumeList.add(volume); - if (volumes.compareAndSet(curVolumes, - volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { - break; - } else { - if (FsDatasetImpl.LOG.isDebugEnabled()) { - FsDatasetImpl.LOG.debug( - "The volume list has been changed concurrently, " + - "retry to remove volume: " + ref.getVolume().getStorageID()); - } - } - } + volumes.add(volume); if (blockScanner != null) { blockScanner.addVolumeScanner(ref); } else { @@ -311,37 +293,22 @@ class FsVolumeList { * @param target the volume instance to be removed. */ private void removeVolume(FsVolumeImpl target) { - while (true) { - final FsVolumeImpl[] curVolumes = volumes.get(); - final List volumeList = Lists.newArrayList(curVolumes); - if (volumeList.remove(target)) { - if (volumes.compareAndSet(curVolumes, - volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { - if (blockScanner != null) { - blockScanner.removeVolumeScanner(target); - } - try { - target.closeAndWait(); - } catch (IOException e) { - FsDatasetImpl.LOG.warn( - "Error occurs when waiting volume to close: " + target, e); - } - target.shutdown(); - FsDatasetImpl.LOG.info("Removed volume: " + target); - break; - } else { - if (FsDatasetImpl.LOG.isDebugEnabled()) { - FsDatasetImpl.LOG.debug( - "The volume list has been changed concurrently, " + - "retry to remove volume: " + target); - } - } - } else { - if (FsDatasetImpl.LOG.isDebugEnabled()) { - FsDatasetImpl.LOG.debug("Volume " + target + - " does not exist or is removed by others."); - } - break; + if (volumes.remove(target)) { + if (blockScanner != null) { + blockScanner.removeVolumeScanner(target); + } + try { + target.closeAndWait(); + } catch (IOException e) { + FsDatasetImpl.LOG.warn( + "Error occurs when waiting volume to close: " + target, e); + } + target.shutdown(); + FsDatasetImpl.LOG.info("Removed volume: " + target); + } else { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Volume " + target + + " does not exist or is removed by others."); } } } @@ -352,16 +319,10 @@ class FsVolumeList { * @param clearFailure set true to remove failure info for this volume. */ void removeVolume(File volume, boolean clearFailure) { - // Make a copy of volumes to remove one volume. - final FsVolumeImpl[] curVolumes = volumes.get(); - final List volumeList = Lists.newArrayList(curVolumes); - for (Iterator it = volumeList.iterator(); it.hasNext(); ) { - FsVolumeImpl fsVolume = it.next(); - String basePath, targetPath; - basePath = new File(fsVolume.getBasePath()).getAbsolutePath(); - targetPath = volume.getAbsolutePath(); + for (FsVolumeImpl fsVolume : volumes) { + String basePath = new File(fsVolume.getBasePath()).getAbsolutePath(); + String targetPath = volume.getAbsolutePath(); if (basePath.equals(targetPath)) { - // Make sure the removed volume is the one in the curVolumes. removeVolume(fsVolume); } } @@ -397,7 +358,7 @@ class FsVolumeList { final List exceptions = Collections.synchronizedList( new ArrayList()); List blockPoolAddingThreads = new ArrayList(); - for (final FsVolumeImpl v : volumes.get()) { + for (final FsVolumeImpl v : volumes) { Thread t = new Thread() { public void run() { try (FsVolumeReference ref = v.obtainReference()) { @@ -438,13 +399,13 @@ class FsVolumeList { void removeBlockPool(String bpid, Map blocksPerVolume) { - for (FsVolumeImpl v : volumes.get()) { + for (FsVolumeImpl v : volumes) { v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage())); } } void shutdown() { - for (FsVolumeImpl volume : volumes.get()) { + for (FsVolumeImpl volume : volumes) { if(volume != null) { volume.shutdown(); }