HDFS-16043. Add markedDeleteBlockScrubberThread to delete blocks asynchronously (#3063). Contributed by Xiangyi Zhu.
Reviewed-by: tomscut <litao@bigo.sg> Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
7542677470
commit
034dc8d03c
|
@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -191,6 +192,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
private volatile long lowRedundancyBlocksCount = 0L;
|
||||
private volatile long scheduledReplicationBlocksCount = 0L;
|
||||
|
||||
private final long deleteBlockLockTimeMs = 500;
|
||||
private final long deleteBlockUnlockIntervalTimeMs = 100;
|
||||
|
||||
/** flag indicating whether replication queues have been initialized */
|
||||
private boolean initializedReplQueues;
|
||||
|
||||
|
@ -324,6 +328,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
* {@link #redundancyThread} has run at least one full iteration.
|
||||
*/
|
||||
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
|
||||
/**
|
||||
* markedDeleteBlockScrubber thread for handling async delete blocks.
|
||||
*/
|
||||
private final Daemon markedDeleteBlockScrubberThread =
|
||||
new Daemon(new MarkedDeleteBlockScrubber());
|
||||
|
||||
/** Block report thread for handling async reports. */
|
||||
private final BlockReportProcessingThread blockReportThread;
|
||||
|
||||
|
@ -422,6 +432,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
*/
|
||||
private int numBlocksPerIteration;
|
||||
|
||||
/**
|
||||
* The blocks of deleted files are put into the queue,
|
||||
* and the cleanup thread processes these blocks periodically.
|
||||
*/
|
||||
private final ConcurrentLinkedQueue<List<BlockInfo>> markedDeleteQueue;
|
||||
|
||||
/**
|
||||
* Progress of the Reconstruction queues initialisation.
|
||||
*/
|
||||
|
@ -475,7 +491,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
datanodeManager.getBlockInvalidateLimit(),
|
||||
startupDelayBlockDeletionInMs,
|
||||
blockIdManager);
|
||||
|
||||
markedDeleteQueue = new ConcurrentLinkedQueue<>();
|
||||
// Compute the map capacity by allocating 2% of total memory
|
||||
blocksMap = new BlocksMap(
|
||||
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
|
||||
|
@ -725,6 +741,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
datanodeManager.activate(conf);
|
||||
this.redundancyThread.setName("RedundancyMonitor");
|
||||
this.redundancyThread.start();
|
||||
this.markedDeleteBlockScrubberThread.
|
||||
setName("MarkedDeleteBlockScrubberThread");
|
||||
this.markedDeleteBlockScrubberThread.start();
|
||||
this.blockReportThread.start();
|
||||
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
||||
bmSafeMode.activate(blockTotal);
|
||||
|
@ -738,8 +757,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
try {
|
||||
redundancyThread.interrupt();
|
||||
blockReportThread.interrupt();
|
||||
markedDeleteBlockScrubberThread.interrupt();
|
||||
redundancyThread.join(3000);
|
||||
blockReportThread.join(3000);
|
||||
markedDeleteBlockScrubberThread.join(3000);
|
||||
} catch (InterruptedException ie) {
|
||||
}
|
||||
datanodeManager.close();
|
||||
|
@ -4911,6 +4932,77 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
return lastRedundancyCycleTS.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically deletes the marked block.
|
||||
*/
|
||||
private class MarkedDeleteBlockScrubber implements Runnable {
|
||||
private Iterator<BlockInfo> toDeleteIterator = null;
|
||||
private boolean isSleep;
|
||||
private NameNodeMetrics metrics;
|
||||
|
||||
private void remove(long time) {
|
||||
if (checkToDeleteIterator()) {
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
while (toDeleteIterator.hasNext()) {
|
||||
removeBlock(toDeleteIterator.next());
|
||||
metrics.decrPendingDeleteBlocksCount();
|
||||
if (Time.monotonicNow() - time > deleteBlockLockTimeMs) {
|
||||
isSleep = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkToDeleteIterator() {
|
||||
return toDeleteIterator != null && toDeleteIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Start MarkedDeleteBlockScrubber thread");
|
||||
while (namesystem.isRunning() &&
|
||||
!Thread.currentThread().isInterrupted()) {
|
||||
if (!markedDeleteQueue.isEmpty() || checkToDeleteIterator()) {
|
||||
try {
|
||||
metrics = NameNode.getNameNodeMetrics();
|
||||
metrics.setDeleteBlocksQueued(markedDeleteQueue.size());
|
||||
isSleep = false;
|
||||
long startTime = Time.monotonicNow();
|
||||
remove(startTime);
|
||||
while (!isSleep && !markedDeleteQueue.isEmpty() &&
|
||||
!Thread.currentThread().isInterrupted()) {
|
||||
List<BlockInfo> markedDeleteList = markedDeleteQueue.poll();
|
||||
if (markedDeleteList != null) {
|
||||
toDeleteIterator = markedDeleteList.listIterator();
|
||||
}
|
||||
remove(startTime);
|
||||
}
|
||||
} catch (Exception e){
|
||||
LOG.warn("MarkedDeleteBlockScrubber encountered an exception" +
|
||||
" during the block deletion process, " +
|
||||
" the deletion of the block will retry in {} millisecond.",
|
||||
deleteBlockUnlockIntervalTimeMs, e);
|
||||
}
|
||||
}
|
||||
if (isSleep) {
|
||||
LOG.debug("Clear markedDeleteQueue over {}" +
|
||||
" millisecond to release the write lock", deleteBlockLockTimeMs);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(deleteBlockUnlockIntervalTimeMs);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Stopping MarkedDeleteBlockScrubber.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically calls computeBlockRecoveryWork().
|
||||
*/
|
||||
|
@ -5259,6 +5351,17 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
return blockIdManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ConcurrentLinkedQueue<List<BlockInfo>> getMarkedDeleteQueue() {
|
||||
return markedDeleteQueue;
|
||||
}
|
||||
|
||||
public void addBLocksToMarkedDeleteQueue(List<BlockInfo> blockInfos) {
|
||||
markedDeleteQueue.add(blockInfos);
|
||||
NameNode.getNameNodeMetrics().
|
||||
incrPendingDeleteBlocksCount(blockInfos.size());
|
||||
}
|
||||
|
||||
public long nextGenerationStamp(boolean legacyBlock) throws IOException {
|
||||
return blockIdManager.nextGenerationStamp(legacyBlock);
|
||||
}
|
||||
|
|
|
@ -2373,8 +2373,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
getEditLog().logSync();
|
||||
if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
|
||||
removeBlocks(toRemoveBlocks);
|
||||
toRemoveBlocks.clear();
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
toRemoveBlocks.getToDeleteList());
|
||||
}
|
||||
logAuditEvent(true, operationName, src, null, status);
|
||||
} catch (AccessControlException e) {
|
||||
|
@ -2821,8 +2821,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
if (!skipSync) {
|
||||
getEditLog().logSync();
|
||||
if (toRemoveBlocks != null) {
|
||||
removeBlocks(toRemoveBlocks);
|
||||
toRemoveBlocks.clear();
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
toRemoveBlocks.getToDeleteList());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3345,8 +3345,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
assert res != null;
|
||||
BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks;
|
||||
if (!collectedBlocks.getToDeleteList().isEmpty()) {
|
||||
removeBlocks(collectedBlocks);
|
||||
collectedBlocks.clear();
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
collectedBlocks.getToDeleteList());
|
||||
}
|
||||
|
||||
logAuditEvent(true, operationName + " (options=" +
|
||||
|
@ -3385,7 +3385,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
getEditLog().logSync();
|
||||
logAuditEvent(ret, operationName, src);
|
||||
if (toRemovedBlocks != null) {
|
||||
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
toRemovedBlocks.getToDeleteList());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -3395,30 +3396,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return dir.getPermissionChecker();
|
||||
}
|
||||
|
||||
/**
|
||||
* From the given list, incrementally remove the blocks from blockManager
|
||||
* Writelock is dropped and reacquired every blockDeletionIncrement to
|
||||
* ensure that other waiters on the lock can get in. See HDFS-2938
|
||||
*
|
||||
* @param blocks
|
||||
* An instance of {@link BlocksMapUpdateInfo} which contains a list
|
||||
* of blocks that need to be removed from blocksMap
|
||||
*/
|
||||
void removeBlocks(BlocksMapUpdateInfo blocks) {
|
||||
List<BlockInfo> toDeleteList = blocks.getToDeleteList();
|
||||
Iterator<BlockInfo> iter = toDeleteList.iterator();
|
||||
while (iter.hasNext()) {
|
||||
writeLock();
|
||||
try {
|
||||
for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
|
||||
blockManager.removeBlock(iter.next());
|
||||
}
|
||||
} finally {
|
||||
writeUnlock("removeBlocks");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove leases and inodes related to a given path
|
||||
* @param removedUCFiles INodes whose leases need to be released
|
||||
|
@ -4627,7 +4604,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
INodesInPath.fromINode((INodeFile) bc), false);
|
||||
changed |= toRemoveBlocks != null;
|
||||
if (toRemoveBlocks != null) {
|
||||
removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
toRemoveBlocks.getToDeleteList());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -7338,7 +7316,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
// Breaking the pattern as removing blocks have to happen outside of the
|
||||
// global lock
|
||||
if (blocksToBeDeleted != null) {
|
||||
removeBlocks(blocksToBeDeleted);
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
blocksToBeDeleted.getToDeleteList());
|
||||
}
|
||||
logAuditEvent(true, operationName, rootPath, null, null);
|
||||
}
|
||||
|
@ -7364,7 +7343,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
} finally {
|
||||
writeUnlock(operationName, getLockReportInfoSupplier(rootPath));
|
||||
}
|
||||
removeBlocks(blocksToBeDeleted);
|
||||
blockManager.addBLocksToMarkedDeleteQueue(
|
||||
blocksToBeDeleted.getToDeleteList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -89,6 +89,10 @@ public class NameNodeMetrics {
|
|||
MutableCounterLong blockOpsBatched;
|
||||
@Metric("Number of pending edits")
|
||||
MutableGaugeInt pendingEditsCount;
|
||||
@Metric("Number of delete blocks Queued")
|
||||
MutableGaugeInt deleteBlocksQueued;
|
||||
@Metric("Number of pending deletion blocks")
|
||||
MutableGaugeInt pendingDeleteBlocksCount;
|
||||
|
||||
@Metric("Number of file system operations")
|
||||
public long totalFileOps(){
|
||||
|
@ -341,6 +345,18 @@ public class NameNodeMetrics {
|
|||
blockOpsQueued.set(size);
|
||||
}
|
||||
|
||||
public void setDeleteBlocksQueued(int size) {
|
||||
deleteBlocksQueued.set(size);
|
||||
}
|
||||
|
||||
public void incrPendingDeleteBlocksCount(int size) {
|
||||
pendingDeleteBlocksCount.incr(size);
|
||||
}
|
||||
|
||||
public void decrPendingDeleteBlocksCount() {
|
||||
pendingDeleteBlocksCount.decr();
|
||||
}
|
||||
|
||||
public void addBlockOpsBatched(int count) {
|
||||
blockOpsBatched.incr(count);
|
||||
}
|
||||
|
|
|
@ -190,6 +190,8 @@ public class TestBlocksScheduledCounter {
|
|||
|
||||
// 4. delete the file
|
||||
dfs.delete(filePath, true);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
int blocksScheduled = 0;
|
||||
for (DatanodeDescriptor descriptor : dnList) {
|
||||
if (descriptor.getBlocksScheduled() != 0) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -161,6 +162,8 @@ public class TestDFSRename {
|
|||
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
|
||||
getLocalBlock()) != null);
|
||||
dfs.rename(srcPath, dstPath, Rename.OVERWRITE);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
|
||||
getLocalBlock()) == null);
|
||||
|
||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
|
@ -1356,6 +1357,8 @@ public class TestFileCreation {
|
|||
assertBlocks(bm, oldBlocks, true);
|
||||
|
||||
out = dfs.create(filePath, true);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
byte[] newData = AppendTestUtil.randomBytes(seed, fileSize);
|
||||
try {
|
||||
out.write(newData);
|
||||
|
@ -1363,6 +1366,8 @@ public class TestFileCreation {
|
|||
out.close();
|
||||
}
|
||||
dfs.deleteOnExit(filePath);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
|
||||
LocatedBlocks newBlocks = NameNodeAdapter.getBlockLocations(
|
||||
nn, file, 0, fileSize);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
|
@ -127,7 +128,7 @@ public class TestReadStripedFileWithDecoding {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidateBlock() throws IOException {
|
||||
public void testInvalidateBlock() throws IOException, InterruptedException {
|
||||
final Path file = new Path("/invalidate");
|
||||
final int length = 10;
|
||||
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
||||
|
@ -151,6 +152,8 @@ public class TestReadStripedFileWithDecoding {
|
|||
try {
|
||||
// delete the file
|
||||
dfs.delete(file, true);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem().getBlockManager());
|
||||
// check the block is added to invalidateBlocks
|
||||
final FSNamesystem fsn = cluster.getNamesystem();
|
||||
final BlockManager bm = fsn.getBlockManager();
|
||||
|
|
|
@ -39,6 +39,9 @@ import org.junit.Assert;
|
|||
import org.apache.hadoop.util.Preconditions;
|
||||
|
||||
public class BlockManagerTestUtil {
|
||||
|
||||
static final long SLEEP_TIME = 1000;
|
||||
|
||||
public static void setNodeReplicationLimit(final BlockManager blockManager,
|
||||
final int limit) {
|
||||
blockManager.maxReplicationStreams = limit;
|
||||
|
@ -178,7 +181,20 @@ public class BlockManagerTestUtil {
|
|||
*/
|
||||
public static CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){
|
||||
return blockManager.corruptReplicas;
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the processing of the marked deleted block to complete.
|
||||
*/
|
||||
public static void waitForMarkedDeleteQueueIsEmpty(
|
||||
BlockManager blockManager) throws InterruptedException {
|
||||
while (true) {
|
||||
if (blockManager.getMarkedDeleteQueue().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -253,6 +253,8 @@ public class TestComputeInvalidateWork {
|
|||
}
|
||||
dfs.delete(path, false);
|
||||
dfs.delete(ecFile, false);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
namesystem.writeLock();
|
||||
InvalidateBlocks invalidateBlocks;
|
||||
int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits()
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -175,6 +176,8 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
|
|||
|
||||
// Delete the file and ensure locked RAM goes to zero.
|
||||
fs.delete(path, false);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
||||
waitForLockedBytesUsed(fsd, 0);
|
||||
}
|
||||
|
|
|
@ -445,6 +445,8 @@ public class TestDecommissioningStatus {
|
|||
// Delete the under-replicated file, which should let the
|
||||
// DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
|
||||
AdminStatesBaseTest.cleanupFile(fileSys, f);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
BlockManagerTestUtil.recheckDecommissionState(dm);
|
||||
// Block until the admin's monitor updates the number of tracked nodes.
|
||||
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 0);
|
||||
|
|
|
@ -33,6 +33,7 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
@ -319,7 +320,8 @@ public class TestFileTruncate {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshotWithAppendTruncate() throws IOException {
|
||||
public void testSnapshotWithAppendTruncate()
|
||||
throws IOException, InterruptedException {
|
||||
testSnapshotWithAppendTruncate(0, 1, 2);
|
||||
testSnapshotWithAppendTruncate(0, 2, 1);
|
||||
testSnapshotWithAppendTruncate(1, 0, 2);
|
||||
|
@ -333,7 +335,8 @@ public class TestFileTruncate {
|
|||
* Delete snapshots in the specified order and verify that
|
||||
* remaining snapshots are still readable.
|
||||
*/
|
||||
void testSnapshotWithAppendTruncate(int ... deleteOrder) throws IOException {
|
||||
void testSnapshotWithAppendTruncate(int... deleteOrder)
|
||||
throws IOException, InterruptedException {
|
||||
FSDirectory fsDir = cluster.getNamesystem().getFSDirectory();
|
||||
fs.mkdirs(parent);
|
||||
fs.setQuota(parent, 100, 1000);
|
||||
|
@ -381,16 +384,16 @@ public class TestFileTruncate {
|
|||
// Truncate to block boundary
|
||||
int newLength = length[0] + BLOCK_SIZE / 2;
|
||||
boolean isReady = fs.truncate(src, newLength);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertTrue("Recovery is not expected.", isReady);
|
||||
assertFileLength(snapshotFiles[2], length[2]);
|
||||
assertFileLength(snapshotFiles[1], length[1]);
|
||||
assertFileLength(snapshotFiles[0], length[0]);
|
||||
assertBlockNotPresent(appendedBlk);
|
||||
|
||||
// Diskspace consumed should be 16 bytes * 3. [blk 1,2,3 SS:4]
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
assertThat(contentSummary.getSpaceConsumed(), is(48L));
|
||||
|
||||
// Truncate full block again
|
||||
newLength = length[0] - BLOCK_SIZE / 2;
|
||||
isReady = fs.truncate(src, newLength);
|
||||
|
@ -398,11 +401,9 @@ public class TestFileTruncate {
|
|||
assertFileLength(snapshotFiles[2], length[2]);
|
||||
assertFileLength(snapshotFiles[1], length[1]);
|
||||
assertFileLength(snapshotFiles[0], length[0]);
|
||||
|
||||
// Diskspace consumed should be 16 bytes * 3. [blk 1,2 SS:3,4]
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
assertThat(contentSummary.getSpaceConsumed(), is(48L));
|
||||
|
||||
// Truncate half of the last block
|
||||
newLength -= BLOCK_SIZE / 2;
|
||||
isReady = fs.truncate(src, newLength);
|
||||
|
@ -413,15 +414,12 @@ public class TestFileTruncate {
|
|||
assertFileLength(snapshotFiles[0], length[0]);
|
||||
Block replacedBlk = getLocatedBlocks(src).getLastLocatedBlock()
|
||||
.getBlock().getLocalBlock();
|
||||
|
||||
// Diskspace consumed should be 16 bytes * 3. [blk 1,6 SS:2,3,4]
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
assertThat(contentSummary.getSpaceConsumed(), is(54L));
|
||||
|
||||
snapshotDir = fs.createSnapshot(parent, ss[3]);
|
||||
snapshotFiles[3] = new Path(snapshotDir, truncateFile);
|
||||
length[3] = newLength;
|
||||
|
||||
// Delete file. Should still be able to read snapshots
|
||||
int numINodes = fsDir.getInodeMapSize();
|
||||
isReady = fs.delete(src, false);
|
||||
|
@ -432,17 +430,15 @@ public class TestFileTruncate {
|
|||
assertFileLength(snapshotFiles[0], length[0]);
|
||||
assertEquals("Number of INodes should not change",
|
||||
numINodes, fsDir.getInodeMapSize());
|
||||
|
||||
fs.deleteSnapshot(parent, ss[3]);
|
||||
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertBlockExists(firstBlk);
|
||||
assertBlockExists(lastBlk);
|
||||
assertBlockNotPresent(replacedBlk);
|
||||
|
||||
// Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4]
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
assertThat(contentSummary.getSpaceConsumed(), is(48L));
|
||||
|
||||
// delete snapshots in the specified order
|
||||
fs.deleteSnapshot(parent, ss[deleteOrder[0]]);
|
||||
assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]);
|
||||
|
@ -451,12 +447,12 @@ public class TestFileTruncate {
|
|||
assertBlockExists(lastBlk);
|
||||
assertEquals("Number of INodes should not change",
|
||||
numINodes, fsDir.getInodeMapSize());
|
||||
|
||||
// Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4]
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
assertThat(contentSummary.getSpaceConsumed(), is(48L));
|
||||
|
||||
fs.deleteSnapshot(parent, ss[deleteOrder[1]]);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
|
||||
assertBlockExists(firstBlk);
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
|
@ -470,11 +466,11 @@ public class TestFileTruncate {
|
|||
}
|
||||
assertEquals("Number of INodes should not change",
|
||||
numINodes, fsDir .getInodeMapSize());
|
||||
|
||||
fs.deleteSnapshot(parent, ss[deleteOrder[2]]);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertBlockNotPresent(firstBlk);
|
||||
assertBlockNotPresent(lastBlk);
|
||||
|
||||
// Diskspace consumed should be 0 bytes * 3. []
|
||||
contentSummary = fs.getContentSummary(parent);
|
||||
assertThat(contentSummary.getSpaceConsumed(), is(0L));
|
||||
|
@ -488,7 +484,8 @@ public class TestFileTruncate {
|
|||
* remaining snapshots are still readable.
|
||||
*/
|
||||
@Test
|
||||
public void testSnapshotWithTruncates() throws IOException {
|
||||
public void testSnapshotWithTruncates()
|
||||
throws IOException, InterruptedException {
|
||||
testSnapshotWithTruncates(0, 1, 2);
|
||||
testSnapshotWithTruncates(0, 2, 1);
|
||||
testSnapshotWithTruncates(1, 0, 2);
|
||||
|
@ -497,7 +494,8 @@ public class TestFileTruncate {
|
|||
testSnapshotWithTruncates(2, 1, 0);
|
||||
}
|
||||
|
||||
void testSnapshotWithTruncates(int ... deleteOrder) throws IOException {
|
||||
void testSnapshotWithTruncates(int... deleteOrder)
|
||||
throws IOException, InterruptedException {
|
||||
fs.mkdirs(parent);
|
||||
fs.setQuota(parent, 100, 1000);
|
||||
fs.allowSnapshot(parent);
|
||||
|
@ -544,6 +542,8 @@ public class TestFileTruncate {
|
|||
assertThat(contentSummary.getSpaceConsumed(), is(42L));
|
||||
|
||||
fs.deleteSnapshot(parent, ss[deleteOrder[0]]);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]);
|
||||
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
|
||||
assertFileLength(src, length[2]);
|
||||
|
@ -561,6 +561,8 @@ public class TestFileTruncate {
|
|||
}
|
||||
|
||||
fs.deleteSnapshot(parent, ss[deleteOrder[1]]);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
|
||||
assertFileLength(src, length[2]);
|
||||
assertBlockExists(firstBlk);
|
||||
|
@ -581,6 +583,8 @@ public class TestFileTruncate {
|
|||
}
|
||||
|
||||
fs.deleteSnapshot(parent, ss[deleteOrder[2]]);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
assertFileLength(src, length[2]);
|
||||
assertBlockExists(firstBlk);
|
||||
|
||||
|
@ -590,6 +594,8 @@ public class TestFileTruncate {
|
|||
assertThat(contentSummary.getLength(), is(6L));
|
||||
|
||||
fs.delete(src, false);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem().getBlockManager());
|
||||
assertBlockNotPresent(firstBlk);
|
||||
|
||||
// Diskspace consumed should be 0 bytes * 3. []
|
||||
|
@ -1256,7 +1262,8 @@ public class TestFileTruncate {
|
|||
cluster.getNamesystem().getFSDirectory().getBlockManager()
|
||||
.getTotalBlocks());
|
||||
fs.delete(p, true);
|
||||
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem().getBlockManager());
|
||||
assertEquals("block num should 0", 0,
|
||||
cluster.getNamesystem().getFSDirectory().getBlockManager()
|
||||
.getTotalBlocks());
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -140,6 +141,8 @@ public class TestLargeDirectoryDelete {
|
|||
|
||||
final long start = Time.now();
|
||||
mc.getFileSystem().delete(new Path("/root"), true); // recursive delete
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
mc.getNamesystem(0).getBlockManager());
|
||||
final long end = Time.now();
|
||||
threads[0].endThread();
|
||||
threads[1].endThread();
|
||||
|
|
|
@ -139,6 +139,8 @@ public class TestMetaSave {
|
|||
nnRpc.delete("/filestatus0", true);
|
||||
nnRpc.delete("/filestatus1", true);
|
||||
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem().getBlockManager());
|
||||
nnRpc.metaSave("metasaveAfterDelete.out.txt");
|
||||
|
||||
// Verification
|
||||
|
|
|
@ -1031,7 +1031,8 @@ public class TestNameNodeMXBean {
|
|||
@Test
|
||||
public void testTotalBlocksMetrics() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
FSNamesystem namesystem = null;
|
||||
FSNamesystem activeNn = null;
|
||||
FSNamesystem standbyNn = null;
|
||||
DistributedFileSystem fs = null;
|
||||
try {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
@ -1046,12 +1047,16 @@ public class TestNameNodeMXBean {
|
|||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(totalSize).build();
|
||||
namesystem = cluster.getNamesystem();
|
||||
fs = cluster.getFileSystem();
|
||||
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(1)).
|
||||
numDataNodes(totalSize).build();
|
||||
cluster.waitActive();
|
||||
cluster.transitionToActive(0);
|
||||
activeNn = cluster.getNamesystem(0);
|
||||
standbyNn = cluster.getNamesystem(1);
|
||||
fs = cluster.getFileSystem(0);
|
||||
fs.enableErasureCodingPolicy(
|
||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||
verifyTotalBlocksMetrics(0L, 0L, namesystem.getTotalBlocks());
|
||||
verifyTotalBlocksMetrics(0L, 0L, activeNn.getTotalBlocks());
|
||||
|
||||
// create small file
|
||||
Path replDirPath = new Path("/replicated");
|
||||
|
@ -1068,7 +1073,7 @@ public class TestNameNodeMXBean {
|
|||
final int smallLength = cellSize * dataBlocks;
|
||||
final byte[] smallBytes = StripedFileTestUtil.generateBytes(smallLength);
|
||||
DFSTestUtil.writeFile(fs, ecFileSmall, smallBytes);
|
||||
verifyTotalBlocksMetrics(1L, 1L, namesystem.getTotalBlocks());
|
||||
verifyTotalBlocksMetrics(1L, 1L, activeNn.getTotalBlocks());
|
||||
|
||||
// create learge file
|
||||
Path replFileLarge = new Path(replDirPath, "replfile_large");
|
||||
|
@ -1079,15 +1084,20 @@ public class TestNameNodeMXBean {
|
|||
final int largeLength = blockSize * totalSize + smallLength;
|
||||
final byte[] largeBytes = StripedFileTestUtil.generateBytes(largeLength);
|
||||
DFSTestUtil.writeFile(fs, ecFileLarge, largeBytes);
|
||||
verifyTotalBlocksMetrics(3L, 3L, namesystem.getTotalBlocks());
|
||||
verifyTotalBlocksMetrics(3L, 3L, activeNn.getTotalBlocks());
|
||||
|
||||
// delete replicated files
|
||||
fs.delete(replDirPath, true);
|
||||
verifyTotalBlocksMetrics(0L, 3L, namesystem.getTotalBlocks());
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
verifyTotalBlocksMetrics(0L, 3L, activeNn.getTotalBlocks());
|
||||
|
||||
// delete ec files
|
||||
fs.delete(ecDirPath, true);
|
||||
verifyTotalBlocksMetrics(0L, 0L, namesystem.getTotalBlocks());
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
verifyTotalBlocksMetrics(0L, 0L, activeNn.getTotalBlocks());
|
||||
verifyTotalBlocksMetrics(0L, 0L, standbyNn.getTotalBlocks());
|
||||
} finally {
|
||||
if (fs != null) {
|
||||
try {
|
||||
|
@ -1096,9 +1106,16 @@ public class TestNameNodeMXBean {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
if (namesystem != null) {
|
||||
if (activeNn != null) {
|
||||
try {
|
||||
namesystem.close();
|
||||
activeNn.close();
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (standbyNn != null) {
|
||||
try {
|
||||
standbyNn.close();
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
|
|
|
@ -347,6 +347,8 @@ public class TestHASafeMode {
|
|||
// once it starts up
|
||||
banner("Removing the blocks without rolling the edit log");
|
||||
fs.delete(new Path("/test"), true);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
BlockManagerTestUtil.computeAllPendingWork(
|
||||
nn0.getNamesystem().getBlockManager());
|
||||
cluster.triggerHeartbeats();
|
||||
|
@ -386,6 +388,8 @@ public class TestHASafeMode {
|
|||
// ACKed when due to block removals.
|
||||
banner("Removing the blocks without rolling the edit log");
|
||||
fs.delete(new Path("/test"), true);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem(0).getBlockManager());
|
||||
BlockManagerTestUtil.computeAllPendingWork(
|
||||
nn0.getNamesystem().getBlockManager());
|
||||
|
||||
|
|
|
@ -661,6 +661,8 @@ public class TestNameNodeMetrics {
|
|||
// verify ExcessBlocks metric is decremented and
|
||||
// excessReplicateMap is cleared after deleting a file
|
||||
fs.delete(file, true);
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem().getBlockManager());
|
||||
rb = getMetrics(NS_METRICS);
|
||||
assertGauge("ExcessBlocks", 0L, rb);
|
||||
assertEquals(0L, bm.getExcessBlocksCount());
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
|
@ -1128,7 +1129,8 @@ public class TestSnapshotDeletion {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCorrectNumberOfBlocksAfterRestart() throws IOException {
|
||||
public void testCorrectNumberOfBlocksAfterRestart()
|
||||
throws IOException, InterruptedException {
|
||||
final Path foo = new Path("/foo");
|
||||
final Path bar = new Path(foo, "bar");
|
||||
final Path file = new Path(foo, "file");
|
||||
|
@ -1149,9 +1151,10 @@ public class TestSnapshotDeletion {
|
|||
hdfs.delete(bar, true);
|
||||
hdfs.delete(foo, true);
|
||||
|
||||
long numberOfBlocks = cluster.getNamesystem().getBlocksTotal();
|
||||
cluster.restartNameNode(0);
|
||||
assertEquals(numberOfBlocks, cluster.getNamesystem().getBlocksTotal());
|
||||
BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
|
||||
cluster.getNamesystem().getBlockManager());
|
||||
assertEquals(0, cluster.getNamesystem().getBlocksTotal());
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue