HDFS-6833. DirectoryScanner should not register a deleting block with memory of DataNode. Contributed by Shinichi Yamashita

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-03-13 02:25:32 +08:00
parent 06ce1d9a6c
commit 6dae6d12ec
8 changed files with 158 additions and 11 deletions

View File

@ -1139,6 +1139,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7830. DataNode does not release the volume lock when adding a volume
fails. (Lei Xu via Colin P. Mccabe)
HDFS-6833. DirectoryScanner should not register a deleting block with
memory of DataNode. (Shinichi Yamashita via szetszwo)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -443,13 +443,14 @@ public class DirectoryScanner implements Runnable {
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) {
FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)];
ScanInfo info = blockpoolReport[Math.min(
d, blockpoolReport.length - 1)];
FinalizedReplica memBlock = memReport[m];
ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
@ -495,8 +496,11 @@ public class DirectoryScanner implements Runnable {
current.getBlockId(), current.getVolume());
}
while (d < blockpoolReport.length) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport[d++]);
if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport[d]);
}
d++;
}
LOG.info(statsRecord.toString());
} //end for

View File

@ -543,4 +543,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Check whether the block was pinned
*/
public boolean getPinning(ExtendedBlock block) throws IOException;
/**
* Confirm whether the block is deleting
*/
public boolean isDeletingBlock(String bpid, long blockId);
}

View File

@ -22,7 +22,10 @@ import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -64,9 +67,14 @@ class FsDatasetAsyncDiskService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
private final FsDatasetImpl fsdatasetImpl;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
private Map<String, Set<Long>> deletedBlockIds
= new HashMap<String, Set<Long>>();
private static final int MAX_DELETED_BLOCKS = 64;
private int numDeletedBlocks = 0;
/**
* Create a AsyncDiskServices with a set of volumes (specified by their
@ -75,8 +83,9 @@ class FsDatasetAsyncDiskService {
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
*/
FsDatasetAsyncDiskService(DataNode datanode) {
FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) {
this.datanode = datanode;
this.fsdatasetImpl = fsdatasetImpl;
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
@ -286,7 +295,27 @@ class FsDatasetAsyncDiskService {
LOG.info("Deleted " + block.getBlockPoolId() + " "
+ block.getLocalBlock() + " file " + blockFile);
}
updateDeletedBlockId(block);
IOUtils.cleanup(null, volumeRef);
}
}
private synchronized void updateDeletedBlockId(ExtendedBlock block) {
Set<Long> blockIds = deletedBlockIds.get(block.getBlockPoolId());
if (blockIds == null) {
blockIds = new HashSet<Long>();
deletedBlockIds.put(block.getBlockPoolId(), blockIds);
}
blockIds.add(block.getBlockId());
numDeletedBlocks++;
if (numDeletedBlocks == MAX_DELETED_BLOCKS) {
for (Entry<String, Set<Long>> e : deletedBlockIds.entrySet()) {
String bpid = e.getKey();
Set<Long> bs = e.getValue();
fsdatasetImpl.removeDeletedBlocks(bpid, bs);
bs.clear();
}
numDeletedBlocks = 0;
}
}
}

View File

@ -237,6 +237,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private volatile boolean fsRunning;
final ReplicaMap volumeMap;
final Map<String, Set<Long>> deletingBlock;
final RamDiskReplicaTracker ramDiskReplicaTracker;
final RamDiskAsyncLazyPersistService asyncLazyPersistService;
@ -298,8 +299,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
deletingBlock = new HashMap<String, Set<Long>>();
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx));
@ -1795,7 +1797,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ ". Parent not found for file " + f);
continue;
}
volumeMap.remove(bpid, invalidBlks[i]);
ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]);
addDeletingBlock(bpid, removing.getBlockId());
if (LOG.isDebugEnabled()) {
LOG.debug("Block file " + removing.getBlockFile().getName()
+ " is to be deleted");
}
}
if (v.isTransientStorage()) {
@ -3005,5 +3012,35 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
return fss.getPermission().getStickyBit();
}
@Override
public boolean isDeletingBlock(String bpid, long blockId) {
synchronized(deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
return s != null ? s.contains(blockId) : false;
}
}
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
synchronized (deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
if (s != null) {
for (Long id : blockIds) {
s.remove(id);
}
}
}
}
private void addDeletingBlock(String bpid, Long blockId) {
synchronized(deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
if (s == null) {
s = new HashSet<Long>();
deletingBlock.put(bpid, s);
}
s.add(blockId);
}
}
}

View File

@ -1318,5 +1318,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public boolean getPinning(ExtendedBlock b) throws IOException {
return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
}
@Override
public boolean isDeletingBlock(String bpid, long blockId) {
throw new UnsupportedOperationException();
}
}

View File

@ -429,4 +429,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
public boolean getPinning(ExtendedBlock block) throws IOException {
return false;
}
@Override
public boolean isDeletingBlock(String bpid, long blockId) {
return false;
}
}

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
@ -29,8 +33,11 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
@ -89,6 +96,8 @@ public class TestFsDatasetImpl {
private DataStorage storage;
private FsDatasetImpl dataset;
private final static String BLOCKPOOL = "BP-TEST";
private static Storage.StorageDirectory createStorageDirectory(File root) {
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
dsForStorageUuid.createStorageID(sd, false);
@ -334,4 +343,54 @@ public class TestFsDatasetImpl {
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
@Test
public void testDeletingBlocks() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
FsVolumeImpl vol = ds.getVolumes().get(0);
ExtendedBlock eb;
ReplicaInfo info;
List<Block> blockList = new ArrayList<Block>();
for (int i = 1; i <= 63; i++) {
eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
info = new FinalizedReplica(
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
ds.volumeMap.add(BLOCKPOOL, info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
blockList.add(info);
}
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Nothing to do
}
assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
blockList.clear();
eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
info = new FinalizedReplica(
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
ds.volumeMap.add(BLOCKPOOL, info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
blockList.add(info);
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Nothing to do
}
assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
} finally {
cluster.shutdown();
}
}
}