svn merge -c 1301127 from trunk for HDFS-3005.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1301130 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8e5862e342
commit
0c90bd9fa3
|
@ -208,6 +208,8 @@ Release 0.23.3 - UNRELEASED
|
||||||
HDFS-3093. Fix bug where namenode -format interpreted the -force flag in
|
HDFS-3093. Fix bug where namenode -format interpreted the -force flag in
|
||||||
reverse. (todd)
|
reverse. (todd)
|
||||||
|
|
||||||
|
HDFS-3005. FSVolume.decDfsUsed(..) should be synchronized. (szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-1623 SUBTASKS
|
BREAKDOWN OF HDFS-1623 SUBTASKS
|
||||||
|
|
||||||
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
||||||
|
|
|
@ -95,15 +95,18 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
* A node type that can be built into a tree reflecting the
|
* A node type that can be built into a tree reflecting the
|
||||||
* hierarchy of blocks on the local disk.
|
* hierarchy of blocks on the local disk.
|
||||||
*/
|
*/
|
||||||
private class FSDir {
|
private static class FSDir {
|
||||||
|
final int maxBlocksPerDir;
|
||||||
final File dir;
|
final File dir;
|
||||||
int numBlocks = 0;
|
int numBlocks = 0;
|
||||||
FSDir children[];
|
FSDir children[];
|
||||||
int lastChildIdx = 0;
|
int lastChildIdx = 0;
|
||||||
|
|
||||||
private FSDir(File dir)
|
private FSDir(File dir, int maxBlocksPerDir)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.dir = dir;
|
this.dir = dir;
|
||||||
|
this.maxBlocksPerDir = maxBlocksPerDir;
|
||||||
|
|
||||||
this.children = null;
|
this.children = null;
|
||||||
if (!dir.exists()) {
|
if (!dir.exists()) {
|
||||||
if (!dir.mkdirs()) {
|
if (!dir.mkdirs()) {
|
||||||
|
@ -115,7 +118,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
List<FSDir> dirList = new ArrayList<FSDir>();
|
List<FSDir> dirList = new ArrayList<FSDir>();
|
||||||
for (int idx = 0; idx < files.length; idx++) {
|
for (int idx = 0; idx < files.length; idx++) {
|
||||||
if (files[idx].isDirectory()) {
|
if (files[idx].isDirectory()) {
|
||||||
dirList.add(new FSDir(files[idx]));
|
dirList.add(new FSDir(files[idx], maxBlocksPerDir));
|
||||||
} else if (Block.isBlockFilename(files[idx])) {
|
} else if (Block.isBlockFilename(files[idx])) {
|
||||||
numBlocks++;
|
numBlocks++;
|
||||||
}
|
}
|
||||||
|
@ -165,7 +168,8 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
if (children == null || children.length == 0) {
|
if (children == null || children.length == 0) {
|
||||||
children = new FSDir[maxBlocksPerDir];
|
children = new FSDir[maxBlocksPerDir];
|
||||||
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
|
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
|
||||||
children[idx] = new FSDir(new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx));
|
final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
|
||||||
|
children[idx] = new FSDir(sub, maxBlocksPerDir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,8 +301,10 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
* A BlockPoolSlice represents a portion of a BlockPool stored on a volume.
|
* A BlockPoolSlice represents a portion of a BlockPool stored on a volume.
|
||||||
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
||||||
* cluster represent a single block pool.
|
* cluster represent a single block pool.
|
||||||
|
*
|
||||||
|
* This class is synchronized by {@link FSVolume}.
|
||||||
*/
|
*/
|
||||||
private class BlockPoolSlice {
|
private static class BlockPoolSlice {
|
||||||
private final String bpid;
|
private final String bpid;
|
||||||
private final FSVolume volume; // volume to which this BlockPool belongs to
|
private final FSVolume volume; // volume to which this BlockPool belongs to
|
||||||
private final File currentDir; // StorageDirectory/current/bpid/current
|
private final File currentDir; // StorageDirectory/current/bpid/current
|
||||||
|
@ -335,10 +341,16 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
FileUtil.fullyDelete(tmpDir);
|
FileUtil.fullyDelete(tmpDir);
|
||||||
}
|
}
|
||||||
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
|
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
|
||||||
|
final boolean supportAppends = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
|
||||||
|
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
|
||||||
if (rbwDir.exists() && !supportAppends) {
|
if (rbwDir.exists() && !supportAppends) {
|
||||||
FileUtil.fullyDelete(rbwDir);
|
FileUtil.fullyDelete(rbwDir);
|
||||||
}
|
}
|
||||||
this.finalizedDir = new FSDir(finalizedDir);
|
final int maxBlocksPerDir = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
|
||||||
|
this.finalizedDir = new FSDir(finalizedDir, maxBlocksPerDir);
|
||||||
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
|
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
|
||||||
if (!rbwDir.isDirectory()) {
|
if (!rbwDir.isDirectory()) {
|
||||||
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
|
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
|
||||||
|
@ -365,12 +377,12 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
return rbwDir;
|
return rbwDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This should be used only by {@link FSVolume#decDfsUsed(String, long)}
|
||||||
|
* and it will be synchronized there.
|
||||||
|
*/
|
||||||
void decDfsUsed(long value) {
|
void decDfsUsed(long value) {
|
||||||
// The caller to this method (BlockFileDeleteTask.run()) does
|
dfsUsage.decDfsUsed(value);
|
||||||
// not have locked FSDataset.this yet.
|
|
||||||
synchronized(FSDataset.this) {
|
|
||||||
dfsUsage.decDfsUsed(value);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long getDfsUsed() throws IOException {
|
long getDfsUsed() throws IOException {
|
||||||
|
@ -530,14 +542,22 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
dfsUsage.shutdown();
|
dfsUsage.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class FSVolume implements FSVolumeInterface {
|
/**
|
||||||
|
* The underlying volume used to store replica.
|
||||||
|
*
|
||||||
|
* It uses the {@link FSDataset} object for synchronization.
|
||||||
|
*/
|
||||||
|
static class FSVolume implements FSVolumeInterface {
|
||||||
|
private final FSDataset dataset;
|
||||||
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
|
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
|
||||||
private final File currentDir; // <StorageDirectory>/current
|
private final File currentDir; // <StorageDirectory>/current
|
||||||
private final DF usage;
|
private final DF usage;
|
||||||
private final long reserved;
|
private final long reserved;
|
||||||
|
|
||||||
FSVolume(File currentDir, Configuration conf) throws IOException {
|
FSVolume(FSDataset dataset, File currentDir, Configuration conf
|
||||||
|
) throws IOException {
|
||||||
|
this.dataset = dataset;
|
||||||
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
|
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
|
||||||
this.currentDir = currentDir;
|
this.currentDir = currentDir;
|
||||||
|
@ -555,9 +575,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
}
|
}
|
||||||
|
|
||||||
void decDfsUsed(String bpid, long value) {
|
void decDfsUsed(String bpid, long value) {
|
||||||
// The caller to this method (BlockFileDeleteTask.run()) does
|
synchronized(dataset) {
|
||||||
// not have locked FSDataset.this yet.
|
|
||||||
synchronized(FSDataset.this) {
|
|
||||||
BlockPoolSlice bp = map.get(bpid);
|
BlockPoolSlice bp = map.get(bpid);
|
||||||
if (bp != null) {
|
if (bp != null) {
|
||||||
bp.decDfsUsed(value);
|
bp.decDfsUsed(value);
|
||||||
|
@ -566,11 +584,11 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
}
|
}
|
||||||
|
|
||||||
long getDfsUsed() throws IOException {
|
long getDfsUsed() throws IOException {
|
||||||
// TODO valid synchronization
|
|
||||||
long dfsUsed = 0;
|
long dfsUsed = 0;
|
||||||
Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
|
synchronized(dataset) {
|
||||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
for(BlockPoolSlice s : map.values()) {
|
||||||
dfsUsed += entry.getValue().getDfsUsed();
|
dfsUsed += s.getDfsUsed();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return dfsUsed;
|
return dfsUsed;
|
||||||
}
|
}
|
||||||
|
@ -630,11 +648,11 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String[] getBlockPoolList() {
|
public String[] getBlockPoolList() {
|
||||||
synchronized(FSDataset.this) {
|
synchronized(dataset) {
|
||||||
return map.keySet().toArray(new String[map.keySet().size()]);
|
return map.keySet().toArray(new String[map.keySet().size()]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Temporary files. They get moved to the finalized block directory when
|
* Temporary files. They get moved to the finalized block directory when
|
||||||
* the block is finalized.
|
* the block is finalized.
|
||||||
|
@ -658,14 +676,17 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
return bp.addBlock(b, f);
|
return bp.addBlock(b, f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This should be used only by {@link FSVolumeSet#checkDirs()}
|
||||||
|
* and it will be synchronized there.
|
||||||
|
*/
|
||||||
void checkDirs() throws DiskErrorException {
|
void checkDirs() throws DiskErrorException {
|
||||||
// TODO:FEDERATION valid synchronization
|
// TODO:FEDERATION valid synchronization
|
||||||
Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
|
for(BlockPoolSlice s : map.values()) {
|
||||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
s.checkDirs();
|
||||||
entry.getValue().checkDirs();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void getVolumeMap(ReplicasMap volumeMap) throws IOException {
|
void getVolumeMap(ReplicasMap volumeMap) throws IOException {
|
||||||
Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
|
Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
|
||||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
for (Entry<String, BlockPoolSlice> entry : set) {
|
||||||
|
@ -877,31 +898,25 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
// Make a copy of volumes for performing modification
|
// Make a copy of volumes for performing modification
|
||||||
final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
|
final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
|
||||||
|
|
||||||
for (int idx = 0; idx < volumeList.size(); idx++) {
|
for(Iterator<FSVolume> i = volumeList.iterator(); i.hasNext(); ) {
|
||||||
FSVolume fsv = volumeList.get(idx);
|
final FSVolume fsv = i.next();
|
||||||
try {
|
try {
|
||||||
fsv.checkDirs();
|
fsv.checkDirs();
|
||||||
} catch (DiskErrorException e) {
|
} catch (DiskErrorException e) {
|
||||||
DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
|
DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
|
||||||
if (removedVols == null) {
|
if (removedVols == null) {
|
||||||
removedVols = new ArrayList<FSVolume>(1);
|
removedVols = new ArrayList<FSVolume>(2);
|
||||||
}
|
}
|
||||||
removedVols.add(fsv);
|
removedVols.add(fsv);
|
||||||
fsv.shutdown();
|
fsv.shutdown();
|
||||||
volumeList.set(idx, null); // Remove the volume
|
i.remove(); // Remove the volume
|
||||||
numFailedVolumes++;
|
numFailedVolumes++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove null volumes from the volumes array
|
|
||||||
if (removedVols != null && removedVols.size() > 0) {
|
if (removedVols != null && removedVols.size() > 0) {
|
||||||
final List<FSVolume> newVols = new ArrayList<FSVolume>();
|
// Replace volume list
|
||||||
for (FSVolume vol : volumeList) {
|
volumes = Collections.unmodifiableList(volumeList);
|
||||||
if (vol != null) {
|
|
||||||
newVols.add(vol);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
volumes = Collections.unmodifiableList(newVols); // Replace volume list
|
|
||||||
DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
|
DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
|
||||||
+ removedVols.size() + " volumes. List of current volumes: "
|
+ removedVols.size() + " volumes. List of current volumes: "
|
||||||
+ this);
|
+ this);
|
||||||
|
@ -1048,7 +1063,6 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
|
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
final FSVolumeSet volumes;
|
final FSVolumeSet volumes;
|
||||||
private final int maxBlocksPerDir;
|
|
||||||
final ReplicasMap volumeMap;
|
final ReplicasMap volumeMap;
|
||||||
final FSDatasetAsyncDiskService asyncDiskService;
|
final FSDatasetAsyncDiskService asyncDiskService;
|
||||||
private final int validVolsRequired;
|
private final int validVolsRequired;
|
||||||
|
@ -1056,20 +1070,12 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
// Used for synchronizing access to usage stats
|
// Used for synchronizing access to usage stats
|
||||||
private final Object statsLock = new Object();
|
private final Object statsLock = new Object();
|
||||||
|
|
||||||
final boolean supportAppends;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An FSDataset has a directory where it loads its data files.
|
* An FSDataset has a directory where it loads its data files.
|
||||||
*/
|
*/
|
||||||
private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
|
private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.maxBlocksPerDir =
|
|
||||||
conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
|
|
||||||
this.supportAppends =
|
|
||||||
conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
|
|
||||||
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
|
|
||||||
// The number of volumes required for operation is the total number
|
// The number of volumes required for operation is the total number
|
||||||
// of volumes minus the number of failed volumes we can tolerate.
|
// of volumes minus the number of failed volumes we can tolerate.
|
||||||
final int volFailuresTolerated =
|
final int volFailuresTolerated =
|
||||||
|
@ -1098,7 +1104,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
storage.getNumStorageDirs());
|
storage.getNumStorageDirs());
|
||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||||
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
||||||
volArray.add(new FSVolume(dir, conf));
|
volArray.add(new FSVolume(this, dir, conf));
|
||||||
DataNode.LOG.info("FSDataset added volume - " + dir);
|
DataNode.LOG.info("FSDataset added volume - " + dir);
|
||||||
}
|
}
|
||||||
volumeMap = new ReplicasMap(this);
|
volumeMap = new ReplicasMap(this);
|
||||||
|
|
Loading…
Reference in New Issue