HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes. (Walter Su via lei)

(cherry picked from commit 533a2be5ac)
This commit is contained in:
Lei Xu 2015-10-23 13:52:59 -07:00
parent 63cebf57d0
commit 17be4b4c1f
2 changed files with 38 additions and 74 deletions

View File

@ -724,6 +724,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7087. Ability to list /.reserved. (Xiao Chen via wang) HDFS-7087. Ability to list /.reserved. (Xiao Chen via wang)
HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
(Walter Su via lei)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -30,9 +29,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -46,8 +44,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
class FsVolumeList { class FsVolumeList {
private final AtomicReference<FsVolumeImpl[]> volumes = private final CopyOnWriteArrayList<FsVolumeImpl> volumes =
new AtomicReference<>(new FsVolumeImpl[0]); new CopyOnWriteArrayList<>();
// Tracks volume failures, sorted by volume path. // Tracks volume failures, sorted by volume path.
private final Map<String, VolumeFailureInfo> volumeFailureInfos = private final Map<String, VolumeFailureInfo> volumeFailureInfos =
Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>()); Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
@ -71,7 +69,7 @@ class FsVolumeList {
* Return an immutable list view of all the volumes. * Return an immutable list view of all the volumes.
*/ */
List<FsVolumeImpl> getVolumes() { List<FsVolumeImpl> getVolumes() {
return Collections.unmodifiableList(Arrays.asList(volumes.get())); return Collections.unmodifiableList(volumes);
} }
private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize) private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
@ -98,10 +96,8 @@ class FsVolumeList {
*/ */
FsVolumeReference getNextVolume(StorageType storageType, long blockSize) FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
throws IOException { throws IOException {
// Get a snapshot of currently available volumes. final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
final FsVolumeImpl[] curVolumes = volumes.get(); for(FsVolumeImpl v : volumes) {
final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length);
for(FsVolumeImpl v : curVolumes) {
if (v.getStorageType() == storageType) { if (v.getStorageType() == storageType) {
list.add(v); list.add(v);
} }
@ -129,7 +125,7 @@ class FsVolumeList {
long getDfsUsed() throws IOException { long getDfsUsed() throws IOException {
long dfsUsed = 0L; long dfsUsed = 0L;
for (FsVolumeImpl v : volumes.get()) { for (FsVolumeImpl v : volumes) {
try(FsVolumeReference ref = v.obtainReference()) { try(FsVolumeReference ref = v.obtainReference()) {
dfsUsed += v.getDfsUsed(); dfsUsed += v.getDfsUsed();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -141,7 +137,7 @@ class FsVolumeList {
long getBlockPoolUsed(String bpid) throws IOException { long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L; long dfsUsed = 0L;
for (FsVolumeImpl v : volumes.get()) { for (FsVolumeImpl v : volumes) {
try (FsVolumeReference ref = v.obtainReference()) { try (FsVolumeReference ref = v.obtainReference()) {
dfsUsed += v.getBlockPoolUsed(bpid); dfsUsed += v.getBlockPoolUsed(bpid);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -153,7 +149,7 @@ class FsVolumeList {
long getCapacity() { long getCapacity() {
long capacity = 0L; long capacity = 0L;
for (FsVolumeImpl v : volumes.get()) { for (FsVolumeImpl v : volumes) {
try (FsVolumeReference ref = v.obtainReference()) { try (FsVolumeReference ref = v.obtainReference()) {
capacity += v.getCapacity(); capacity += v.getCapacity();
} catch (IOException e) { } catch (IOException e) {
@ -165,7 +161,7 @@ class FsVolumeList {
long getRemaining() throws IOException { long getRemaining() throws IOException {
long remaining = 0L; long remaining = 0L;
for (FsVolumeSpi vol : volumes.get()) { for (FsVolumeSpi vol : volumes) {
try (FsVolumeReference ref = vol.obtainReference()) { try (FsVolumeReference ref = vol.obtainReference()) {
remaining += vol.getAvailable(); remaining += vol.getAvailable();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -183,7 +179,7 @@ class FsVolumeList {
final List<IOException> exceptions = Collections.synchronizedList( final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>()); new ArrayList<IOException>());
List<Thread> replicaAddingThreads = new ArrayList<Thread>(); List<Thread> replicaAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes.get()) { for (final FsVolumeImpl v : volumes) {
Thread t = new Thread() { Thread t = new Thread() {
public void run() { public void run() {
try (FsVolumeReference ref = v.obtainReference()) { try (FsVolumeReference ref = v.obtainReference()) {
@ -267,7 +263,7 @@ class FsVolumeList {
@Override @Override
public String toString() { public String toString() {
return Arrays.toString(volumes.get()); return volumes.toString();
} }
/** /**
@ -277,21 +273,7 @@ class FsVolumeList {
*/ */
void addVolume(FsVolumeReference ref) { void addVolume(FsVolumeReference ref) {
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume(); FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
while (true) { volumes.add(volume);
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
volumeList.add(volume);
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break;
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
"retry to remove volume: " + ref.getVolume().getStorageID());
}
}
}
if (blockScanner != null) { if (blockScanner != null) {
blockScanner.addVolumeScanner(ref); blockScanner.addVolumeScanner(ref);
} else { } else {
@ -311,37 +293,22 @@ class FsVolumeList {
* @param target the volume instance to be removed. * @param target the volume instance to be removed.
*/ */
private void removeVolume(FsVolumeImpl target) { private void removeVolume(FsVolumeImpl target) {
while (true) { if (volumes.remove(target)) {
final FsVolumeImpl[] curVolumes = volumes.get(); if (blockScanner != null) {
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); blockScanner.removeVolumeScanner(target);
if (volumeList.remove(target)) { }
if (volumes.compareAndSet(curVolumes, try {
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { target.closeAndWait();
if (blockScanner != null) { } catch (IOException e) {
blockScanner.removeVolumeScanner(target); FsDatasetImpl.LOG.warn(
} "Error occurs when waiting volume to close: " + target, e);
try { }
target.closeAndWait(); target.shutdown();
} catch (IOException e) { FsDatasetImpl.LOG.info("Removed volume: " + target);
FsDatasetImpl.LOG.warn( } else {
"Error occurs when waiting volume to close: " + target, e); if (FsDatasetImpl.LOG.isDebugEnabled()) {
} FsDatasetImpl.LOG.debug("Volume " + target +
target.shutdown(); " does not exist or is removed by others.");
FsDatasetImpl.LOG.info("Removed volume: " + target);
break;
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
"retry to remove volume: " + target);
}
}
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Volume " + target +
" does not exist or is removed by others.");
}
break;
} }
} }
} }
@ -352,16 +319,10 @@ class FsVolumeList {
* @param clearFailure set true to remove failure info for this volume. * @param clearFailure set true to remove failure info for this volume.
*/ */
void removeVolume(File volume, boolean clearFailure) { void removeVolume(File volume, boolean clearFailure) {
// Make a copy of volumes to remove one volume. for (FsVolumeImpl fsVolume : volumes) {
final FsVolumeImpl[] curVolumes = volumes.get(); String basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); String targetPath = volume.getAbsolutePath();
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
FsVolumeImpl fsVolume = it.next();
String basePath, targetPath;
basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
targetPath = volume.getAbsolutePath();
if (basePath.equals(targetPath)) { if (basePath.equals(targetPath)) {
// Make sure the removed volume is the one in the curVolumes.
removeVolume(fsVolume); removeVolume(fsVolume);
} }
} }
@ -397,7 +358,7 @@ class FsVolumeList {
final List<IOException> exceptions = Collections.synchronizedList( final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>()); new ArrayList<IOException>());
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>(); List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes.get()) { for (final FsVolumeImpl v : volumes) {
Thread t = new Thread() { Thread t = new Thread() {
public void run() { public void run() {
try (FsVolumeReference ref = v.obtainReference()) { try (FsVolumeReference ref = v.obtainReference()) {
@ -438,13 +399,13 @@ class FsVolumeList {
void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs> void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
blocksPerVolume) { blocksPerVolume) {
for (FsVolumeImpl v : volumes.get()) { for (FsVolumeImpl v : volumes) {
v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage())); v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
} }
} }
void shutdown() { void shutdown() {
for (FsVolumeImpl volume : volumes.get()) { for (FsVolumeImpl volume : volumes) {
if(volume != null) { if(volume != null) {
volume.shutdown(); volume.shutdown();
} }