diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3a12d74120a..886984a1795 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; +import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; @@ -786,12 +787,13 @@ private static boolean commitBlock(final BlockInfo block, * * @param bc block collection * @param commitBlock - contains client reported block length and generation + * @param iip - INodes in path to bc * @return true if the last block is changed to committed state. * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ public boolean commitOrCompleteLastBlock(BlockCollection bc, - Block commitBlock) throws IOException { + Block commitBlock, INodesInPath iip) throws IOException { if(commitBlock == null) return false; // not committing, this is a block allocation retry BlockInfo lastBlock = bc.getLastBlock(); @@ -811,7 +813,7 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc, if (committed) { addExpectedReplicasToPending(lastBlock); } - completeBlock(lastBlock, false); + completeBlock(lastBlock, iip, false); } return committed; } @@ -841,11 +843,15 @@ public void addExpectedReplicasToPending(BlockInfo blk) { /** * Convert a specified block of the file to a complete block. + * @param curBlock - block to be completed + * @param iip - INodes in path to file containing curBlock; if null, + * this will be resolved internally + * @param force - force completion of the block * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - private void completeBlock(BlockInfo curBlock, boolean force) - throws IOException { + private void completeBlock(BlockInfo curBlock, INodesInPath iip, + boolean force) throws IOException { if (curBlock.isComplete()) { return; } @@ -860,7 +866,8 @@ private void completeBlock(BlockInfo curBlock, boolean force) "Cannot complete block: block has not been COMMITTED by the client"); } - curBlock.convertToCompleteBlock(); + convertToCompleteBlock(curBlock, iip); + // Since safe-mode only counts complete blocks, and we now have // one more complete block, we need to adjust the total up, and // also count it as safe, if we have at least the minimum replica @@ -874,6 +881,22 @@ private void completeBlock(BlockInfo curBlock, boolean force) curBlock); } + /** + * Convert a specified block of the file to a complete block. + * Skips validity checking and safe mode block total updates; use + * {@link BlockManager#completeBlock} to include these. + * @param curBlock - block to be completed + * @param iip - INodes in path to file containing curBlock; if null, + * this will be resolved internally + * @throws IOException if the block does not have at least a minimal number + * of replicas reported from data-nodes. + */ + private void convertToCompleteBlock(BlockInfo curBlock, INodesInPath iip) + throws IOException { + curBlock.convertToCompleteBlock(); + namesystem.getFSDirectory().updateSpaceForCompleteBlock(curBlock, iip); + } + /** * Force the given block in the given file to be marked as complete, * regardless of whether enough replicas are present. This is necessary @@ -881,7 +904,7 @@ private void completeBlock(BlockInfo curBlock, boolean force) */ public void forceCompleteBlock(final BlockInfo block) throws IOException { block.commitBlock(block); - completeBlock(block, true); + completeBlock(block, null, true); } /** @@ -2910,7 +2933,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED && hasMinStorage(storedBlock, numCurrentReplica)) { - completeBlock(storedBlock, false); + completeBlock(storedBlock, null, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that. @@ -2985,7 +3008,7 @@ private Block addStoredBlock(final BlockInfo block, if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && hasMinStorage(storedBlock, numLiveReplicas)) { addExpectedReplicasToPending(storedBlock); - completeBlock(storedBlock, false); + completeBlock(storedBlock, null, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index e19341cdca6..4c5ecb1d226 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -418,9 +418,9 @@ static BlockInfo[] unprotectedSetReplication( // Make sure the directory has sufficient quotas short oldBR = file.getPreferredBlockReplication(); + long size = file.computeFileSize(true, true); // Ensure the quota does not exceed if (oldBR < replication) { - long size = file.computeFileSize(true, true); fsd.updateCount(iip, 0L, size, oldBR, replication, true); } @@ -428,14 +428,10 @@ static BlockInfo[] unprotectedSetReplication( short targetReplication = (short) Math.max( replication, file.getPreferredBlockReplication()); + if (oldBR > replication) { + fsd.updateCount(iip, 0L, size, oldBR, targetReplication, true); + } for (BlockInfo b : file.getBlocks()) { - if (oldBR == targetReplication) { - continue; - } - if (oldBR > replication) { - fsd.updateCount(iip, 0L, b.getNumBytes(), oldBR, targetReplication, - true); - } bm.setReplication(oldBR, targetReplication, b); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index c913c237069..2a3cabb1bb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException; import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -930,6 +932,51 @@ static void unprotectedUpdateCount(INodesInPath inodesInPath, } } + /** + * Update the cached quota space for a block that is being completed. + * Must only be called once, as the block is being completed. + * @param completeBlk - Completed block for which to update space + * @param inodes - INodes in path to file containing completeBlk; if null + * this will be resolved internally + */ + public void updateSpaceForCompleteBlock(BlockInfo completeBlk, + INodesInPath inodes) throws IOException { + assert namesystem.hasWriteLock(); + INodesInPath iip = inodes != null ? inodes : + INodesInPath.fromINode(namesystem.getBlockCollection(completeBlk)); + INodeFile fileINode = iip.getLastINode().asFile(); + // Adjust disk space consumption if required + final long diff; + final short replicationFactor; + if (fileINode.isStriped()) { + final ErasureCodingPolicy ecPolicy = + FSDirErasureCodingOp.getErasureCodingPolicy(namesystem, iip); + final short numDataUnits = (short) ecPolicy.getNumDataUnits(); + final short numParityUnits = (short) ecPolicy.getNumParityUnits(); + + final long numBlocks = numDataUnits + numParityUnits; + final long fullBlockGroupSize = + fileINode.getPreferredBlockSize() * numBlocks; + + final BlockInfoStriped striped = + new BlockInfoStriped(completeBlk, ecPolicy); + final long actualBlockGroupSize = striped.spaceConsumed(); + + diff = fullBlockGroupSize - actualBlockGroupSize; + replicationFactor = (short) 1; + } else { + diff = fileINode.getPreferredBlockSize() - completeBlk.getNumBytes(); + replicationFactor = fileINode.getFileReplication(); + } + if (diff > 0) { + try { + updateSpaceConsumed(iip, 0, -diff, replicationFactor); + } catch (IOException e) { + LOG.warn("Unexpected exception while updating disk space.", e); + } + } + } + public EnumCounters getStorageTypeDeltas(byte storagePolicyID, long dsDelta, short oldRep, short newRep) { EnumCounters typeSpaceDeltas = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3b14eecfb14..045984064d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -186,7 +186,6 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -207,7 +206,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -3292,40 +3290,7 @@ void commitOrCompleteLastBlock( final Block commitBlock) throws IOException { assert hasWriteLock(); Preconditions.checkArgument(fileINode.isUnderConstruction()); - if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) { - return; - } - - // Adjust disk space consumption if required - final long diff; - final short replicationFactor; - if (fileINode.isStriped()) { - final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp - .getErasureCodingPolicy(this, iip); - final short numDataUnits = (short) ecPolicy.getNumDataUnits(); - final short numParityUnits = (short) ecPolicy.getNumParityUnits(); - - final long numBlocks = numDataUnits + numParityUnits; - final long fullBlockGroupSize = - fileINode.getPreferredBlockSize() * numBlocks; - - final BlockInfoStriped striped = new BlockInfoStriped(commitBlock, - ecPolicy); - final long actualBlockGroupSize = striped.spaceConsumed(); - - diff = fullBlockGroupSize - actualBlockGroupSize; - replicationFactor = (short) 1; - } else { - diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes(); - replicationFactor = fileINode.getFileReplication(); - } - if (diff > 0) { - try { - dir.updateSpaceConsumed(iip, 0, -diff, replicationFactor); - } catch (IOException e) { - LOG.warn("Unexpected exception while updating disk space.", e); - } - } + blockManager.commitOrCompleteLastBlock(fileINode, commitBlock, iip); } void addCommittedBlocksToPending(final INodeFile pendingFile) { @@ -3583,7 +3548,6 @@ void commitBlockSynchronization(ExtendedBlock oldBlock, /** * @param pendingFile open file that needs to be closed * @param storedBlock last block - * @return Path of the file that was closed. * @throws IOException on error */ @VisibleForTesting @@ -5734,6 +5698,7 @@ public BlockManager getBlockManager() { } /** @return the FSDirectory. */ + @Override public FSDirectory getFSDirectory() { return dir; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index c6751443d5d..e07376bc9ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -30,6 +30,8 @@ public interface Namesystem extends RwLock, SafeMode { BlockCollection getBlockCollection(long id); + FSDirectory getFSDirectory(); + void startSecretManagerIfNecessary(); boolean isInSnapshot(long blockCollectionID); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java index d459db09b2e..deb52083ba1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -33,61 +37,79 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.TestFileCreation; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; +import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.ipc.RemoteException; -import org.junit.After; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestDiskspaceQuotaUpdate { private static final int BLOCKSIZE = 1024; private static final short REPLICATION = 4; static final long seed = 0L; - private static final Path dir = new Path("/TestQuotaUpdate"); + private static final Path BASE_DIR = new Path("/TestQuotaUpdate"); - private Configuration conf; - private MiniDFSCluster cluster; - private FSDirectory fsdir; - private DistributedFileSystem dfs; + private static Configuration conf; + private static MiniDFSCluster cluster; - @Before - public void setUp() throws Exception { + @BeforeClass + public static void setUp() throws Exception { conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION) .build(); cluster.waitActive(); - - fsdir = cluster.getNamesystem().getFSDirectory(); - dfs = cluster.getFileSystem(); } - @After - public void tearDown() throws Exception { + @AfterClass + public static void tearDown() throws Exception { if (cluster != null) { cluster.shutdown(); cluster = null; } } + private Path getParent(String testName) { + return new Path(BASE_DIR, testName); + } + + private FSDirectory getFSDirectory() { + return cluster.getNamesystem().getFSDirectory(); + } + + private DistributedFileSystem getDFS() throws IOException { + return cluster.getFileSystem(); + } + /** * Test if the quota can be correctly updated for create file */ @Test (timeout=60000) public void testQuotaUpdateWithFileCreate() throws Exception { - final Path foo = new Path(dir, "foo"); + final Path foo = + new Path(getParent(GenericTestUtils.getMethodName()), "foo"); Path createdFile = new Path(foo, "created_file.data"); - dfs.mkdirs(foo); - dfs.setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1); + getDFS().mkdirs(foo); + getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1); long fileLen = BLOCKSIZE * 2 + BLOCKSIZE / 2; - DFSTestUtil.createFile(dfs, createdFile, BLOCKSIZE / 16, + DFSTestUtil.createFile(getDFS(), createdFile, BLOCKSIZE / 16, fileLen, BLOCKSIZE, REPLICATION, seed); - INode fnode = fsdir.getINode4Write(foo.toString()); + INode fnode = getFSDirectory().getINode4Write(foo.toString()); assertTrue(fnode.isDirectory()); assertTrue(fnode.isQuotaSet()); QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature() @@ -101,18 +123,20 @@ public void testQuotaUpdateWithFileCreate() throws Exception { */ @Test (timeout=60000) public void testUpdateQuotaForAppend() throws Exception { - final Path foo = new Path(dir ,"foo"); + final Path foo = + new Path(getParent(GenericTestUtils.getMethodName()), "foo"); final Path bar = new Path(foo, "bar"); long currentFileLen = BLOCKSIZE; - DFSTestUtil.createFile(dfs, bar, currentFileLen, REPLICATION, seed); - dfs.setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1); + DFSTestUtil.createFile(getDFS(), bar, currentFileLen, REPLICATION, seed); + getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1); // append half of the block data, the previous file length is at block // boundary - DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE / 2); + DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE / 2); currentFileLen += (BLOCKSIZE / 2); - INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); + INodeDirectory fooNode = + getFSDirectory().getINode4Write(foo.toString()).asDirectory(); assertTrue(fooNode.isQuotaSet()); QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature() .getSpaceConsumed(); @@ -120,11 +144,11 @@ public void testUpdateQuotaForAppend() throws Exception { long ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals(currentFileLen * REPLICATION, ds); - ContentSummary c = dfs.getContentSummary(foo); + ContentSummary c = getDFS().getContentSummary(foo); assertEquals(c.getSpaceConsumed(), ds); // append another block, the previous file length is not at block boundary - DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE); + DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE); currentFileLen += BLOCKSIZE; quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); @@ -132,11 +156,11 @@ public void testUpdateQuotaForAppend() throws Exception { ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals(currentFileLen * REPLICATION, ds); - c = dfs.getContentSummary(foo); + c = getDFS().getContentSummary(foo); assertEquals(c.getSpaceConsumed(), ds); // append several blocks - DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE * 3 + BLOCKSIZE / 8); + DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE * 3 + BLOCKSIZE / 8); currentFileLen += (BLOCKSIZE * 3 + BLOCKSIZE / 8); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); @@ -144,7 +168,7 @@ public void testUpdateQuotaForAppend() throws Exception { ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals(currentFileLen * REPLICATION, ds); - c = dfs.getContentSummary(foo); + c = getDFS().getContentSummary(foo); assertEquals(c.getSpaceConsumed(), ds); } @@ -154,16 +178,18 @@ public void testUpdateQuotaForAppend() throws Exception { */ @Test (timeout=60000) public void testUpdateQuotaForFSync() throws Exception { - final Path foo = new Path("/foo"); + final Path foo = + new Path(getParent(GenericTestUtils.getMethodName()), "foo"); final Path bar = new Path(foo, "bar"); - DFSTestUtil.createFile(dfs, bar, BLOCKSIZE, REPLICATION, 0L); - dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); + DFSTestUtil.createFile(getDFS(), bar, BLOCKSIZE, REPLICATION, 0L); + getDFS().setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); - FSDataOutputStream out = dfs.append(bar); + FSDataOutputStream out = getDFS().append(bar); out.write(new byte[BLOCKSIZE / 4]); ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); + INodeDirectory fooNode = + getFSDirectory().getINode4Write(foo.toString()).asDirectory(); QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature() .getSpaceConsumed(); long ns = quota.getNameSpace(); @@ -174,7 +200,7 @@ public void testUpdateQuotaForFSync() throws Exception { out.write(new byte[BLOCKSIZE / 4]); out.close(); - fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); + fooNode = getFSDirectory().getINode4Write(foo.toString()).asDirectory(); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.getNameSpace(); ds = quota.getStorageSpace(); @@ -182,7 +208,7 @@ public void testUpdateQuotaForFSync() throws Exception { assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds); // append another block - DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE); + DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.getNameSpace(); @@ -196,21 +222,21 @@ public void testUpdateQuotaForFSync() throws Exception { */ @Test (timeout=60000) public void testAppendOverStorageQuota() throws Exception { - final Path dir = new Path("/TestAppendOverQuota"); + final Path dir = getParent(GenericTestUtils.getMethodName()); final Path file = new Path(dir, "file"); // create partial block file - dfs.mkdirs(dir); - DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed); + getDFS().mkdirs(dir); + DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed); // lower quota to cause exception when appending to partial block - dfs.setQuota(dir, Long.MAX_VALUE - 1, 1); - final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString()) - .asDirectory(); + getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1); + final INodeDirectory dirNode = + getFSDirectory().getINode4Write(dir.toString()).asDirectory(); final long spaceUsed = dirNode.getDirectoryWithQuotaFeature() .getSpaceConsumed().getStorageSpace(); try { - DFSTestUtil.appendFile(dfs, file, BLOCKSIZE); + DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE); Assert.fail("append didn't fail"); } catch (DSQuotaExceededException e) { // ignore @@ -218,7 +244,7 @@ public void testAppendOverStorageQuota() throws Exception { LeaseManager lm = cluster.getNamesystem().getLeaseManager(); // check that the file exists, isn't UC, and has no dangling lease - INodeFile inode = fsdir.getINode(file.toString()).asFile(); + INodeFile inode = getFSDirectory().getINode(file.toString()).asFile(); Assert.assertNotNull(inode); Assert.assertFalse("should not be UC", inode.isUnderConstruction()); Assert.assertNull("should not have a lease", lm.getLease(inode)); @@ -227,7 +253,7 @@ public void testAppendOverStorageQuota() throws Exception { .getSpaceConsumed().getStorageSpace(); assertEquals(spaceUsed, newSpaceUsed); // make sure edits aren't corrupted - dfs.recoverLease(file); + getDFS().recoverLease(file); cluster.restartNameNodes(); } @@ -237,23 +263,23 @@ public void testAppendOverStorageQuota() throws Exception { */ @Test (timeout=60000) public void testAppendOverTypeQuota() throws Exception { - final Path dir = new Path("/TestAppendOverTypeQuota"); + final Path dir = getParent(GenericTestUtils.getMethodName()); final Path file = new Path(dir, "file"); // create partial block file - dfs.mkdirs(dir); + getDFS().mkdirs(dir); // set the storage policy on dir - dfs.setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); - DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed); + getDFS().setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); + DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed); // set quota of SSD to 1L - dfs.setQuotaByStorageType(dir, StorageType.SSD, 1L); - final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString()) - .asDirectory(); + getDFS().setQuotaByStorageType(dir, StorageType.SSD, 1L); + final INodeDirectory dirNode = + getFSDirectory().getINode4Write(dir.toString()).asDirectory(); final long spaceUsed = dirNode.getDirectoryWithQuotaFeature() .getSpaceConsumed().getStorageSpace(); try { - DFSTestUtil.appendFile(dfs, file, BLOCKSIZE); + DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE); Assert.fail("append didn't fail"); } catch (QuotaByStorageTypeExceededException e) { //ignore @@ -261,7 +287,7 @@ public void testAppendOverTypeQuota() throws Exception { // check that the file exists, isn't UC, and has no dangling lease LeaseManager lm = cluster.getNamesystem().getLeaseManager(); - INodeFile inode = fsdir.getINode(file.toString()).asFile(); + INodeFile inode = getFSDirectory().getINode(file.toString()).asFile(); Assert.assertNotNull(inode); Assert.assertFalse("should not be UC", inode.isUnderConstruction()); Assert.assertNull("should not have a lease", lm.getLease(inode)); @@ -270,7 +296,7 @@ public void testAppendOverTypeQuota() throws Exception { .getSpaceConsumed().getStorageSpace(); assertEquals(spaceUsed, newSpaceUsed); // make sure edits aren't corrupted - dfs.recoverLease(file); + getDFS().recoverLease(file); cluster.restartNameNodes(); } @@ -279,21 +305,21 @@ public void testAppendOverTypeQuota() throws Exception { */ @Test (timeout=60000) public void testTruncateOverQuota() throws Exception { - final Path dir = new Path("/TestTruncateOverquota"); + final Path dir = getParent(GenericTestUtils.getMethodName()); final Path file = new Path(dir, "file"); // create partial block file - dfs.mkdirs(dir); - DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed); + getDFS().mkdirs(dir); + DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed); // lower quota to cause exception when appending to partial block - dfs.setQuota(dir, Long.MAX_VALUE - 1, 1); - final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString()) - .asDirectory(); + getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1); + final INodeDirectory dirNode = + getFSDirectory().getINode4Write(dir.toString()).asDirectory(); final long spaceUsed = dirNode.getDirectoryWithQuotaFeature() .getSpaceConsumed().getStorageSpace(); try { - dfs.truncate(file, BLOCKSIZE / 2 - 1); + getDFS().truncate(file, BLOCKSIZE / 2 - 1); Assert.fail("truncate didn't fail"); } catch (RemoteException e) { assertTrue(e.getClassName().contains("DSQuotaExceededException")); @@ -301,7 +327,7 @@ public void testTruncateOverQuota() throws Exception { // check that the file exists, isn't UC, and has no dangling lease LeaseManager lm = cluster.getNamesystem().getLeaseManager(); - INodeFile inode = fsdir.getINode(file.toString()).asFile(); + INodeFile inode = getFSDirectory().getINode(file.toString()).asFile(); Assert.assertNotNull(inode); Assert.assertFalse("should not be UC", inode.isUnderConstruction()); Assert.assertNull("should not have a lease", lm.getLease(inode)); @@ -310,7 +336,7 @@ public void testTruncateOverQuota() throws Exception { .getSpaceConsumed().getStorageSpace(); assertEquals(spaceUsed, newSpaceUsed); // make sure edits aren't corrupted - dfs.recoverLease(file); + getDFS().recoverLease(file); cluster.restartNameNodes(); } @@ -320,33 +346,34 @@ public void testTruncateOverQuota() throws Exception { @Test public void testQuotaInitialization() throws Exception { final int size = 500; - Path testDir = new Path("/testDir"); + Path testDir = + new Path(getParent(GenericTestUtils.getMethodName()), "testDir"); long expectedSize = 3 * BLOCKSIZE + BLOCKSIZE/2; - dfs.mkdirs(testDir); - dfs.setQuota(testDir, size*4, expectedSize*size*2); + getDFS().mkdirs(testDir); + getDFS().setQuota(testDir, size*4, expectedSize*size*2); Path[] testDirs = new Path[size]; for (int i = 0; i < size; i++) { testDirs[i] = new Path(testDir, "sub" + i); - dfs.mkdirs(testDirs[i]); - dfs.setQuota(testDirs[i], 100, 1000000); - DFSTestUtil.createFile(dfs, new Path(testDirs[i], "a"), expectedSize, + getDFS().mkdirs(testDirs[i]); + getDFS().setQuota(testDirs[i], 100, 1000000); + DFSTestUtil.createFile(getDFS(), new Path(testDirs[i], "a"), expectedSize, (short)1, 1L); } // Directly access the name system to obtain the current cached usage. - INodeDirectory root = fsdir.getRoot(); + INodeDirectory root = getFSDirectory().getRoot(); HashMap nsMap = new HashMap(); HashMap dsMap = new HashMap(); scanDirsWithQuota(root, nsMap, dsMap, false); - fsdir.updateCountForQuota(1); + getFSDirectory().updateCountForQuota(1); scanDirsWithQuota(root, nsMap, dsMap, true); - fsdir.updateCountForQuota(2); + getFSDirectory().updateCountForQuota(2); scanDirsWithQuota(root, nsMap, dsMap, true); - fsdir.updateCountForQuota(4); + getFSDirectory().updateCountForQuota(4); scanDirsWithQuota(root, nsMap, dsMap, true); } @@ -372,4 +399,130 @@ private void scanDirsWithQuota(INodeDirectory dir, } } } + + /** + * Test that the cached quota stays correct between the COMMIT + * and COMPLETE block steps, even if the replication factor is + * changed during this time. + */ + @Test (timeout=60000) + public void testQuotaIssuesWhileCommitting() throws Exception { + // We want a one-DN cluster so that we can force a lack of + // commit by only instrumenting a single DN; we kill the other 3 + List dnprops = new ArrayList<>(); + try { + for (int i = REPLICATION - 1; i > 0; i--) { + dnprops.add(cluster.stopDataNode(i)); + } + + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN( + cluster.getDataNodes().get(0), cluster.getNameNode()); + + testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 4); + testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 4, (short) 1); + + // Don't actually change replication; just check that the sizes + // agree during the commit period + testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 1); + } finally { + for (MiniDFSCluster.DataNodeProperties dnprop : dnprops) { + cluster.restartDataNode(dnprop, true); + } + cluster.waitActive(); + } + } + + private void testQuotaIssuesWhileCommittingHelper( + DatanodeProtocolClientSideTranslatorPB nnSpy, + final short initialReplication, final short finalReplication) + throws Exception { + final String logStmt = + "BUG: Inconsistent storagespace for directory"; + final Path dir = new Path(getParent(GenericTestUtils.getMethodName()), + String.format("%d-%d", initialReplication, finalReplication)); + final Path file = new Path(dir, "testfile"); + + LogCapturer logs = LogCapturer.captureLogs(NameNode.LOG); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (finalReplication != initialReplication) { + getDFS().setReplication(file, finalReplication); + } + // Call getContentSummary before the DN can notify the NN + // that the block has been received to check for discrepancy + getDFS().getContentSummary(dir); + invocation.callRealMethod(); + return null; + } + }).when(nnSpy).blockReceivedAndDeleted( + Mockito.anyObject(), + Mockito.anyString(), + Mockito.anyObject() + ); + + getDFS().mkdirs(dir); + getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); + + DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, initialReplication, 1L); + + // Also check for discrepancy after completing the file + getDFS().getContentSummary(dir); + assertFalse(logs.getOutput().contains(logStmt)); + } + + /** + * Test that the cached quota remains correct when the block has been + * written to but not yet committed, even if the replication factor + * is updated during this time. + */ + private void testQuotaIssuesBeforeCommitting(short initialReplication, + short finalReplication) throws Exception { + final String logStmt = + "BUG: Inconsistent storagespace for directory"; + final Path dir = new Path(getParent(GenericTestUtils.getMethodName()), + String.format("%d-%d", initialReplication, finalReplication)); + final Path file = new Path(dir, "testfile"); + + LogCapturer logs = LogCapturer.captureLogs(NameNode.LOG); + + getDFS().mkdirs(dir); + getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); + + FSDataOutputStream out = + TestFileCreation.createFile(getDFS(), file, initialReplication); + TestFileCreation.writeFile(out, BLOCKSIZE / 2); + out.hflush(); + + getDFS().getContentSummary(dir); + if (finalReplication != initialReplication) { + // While the block is visible to the NN but has not yet been committed, + // change the replication + getDFS().setReplication(file, finalReplication); + } + + out.close(); + + getDFS().getContentSummary(dir); + assertFalse(logs.getOutput().contains(logStmt)); + } + + @Test (timeout=60000) + public void testCachedComputedSizesAgreeBeforeCommitting() throws Exception { + // Don't actually change replication; just check that the sizes + // agree before the commit period + testQuotaIssuesBeforeCommitting((short)1, (short)1); + } + + @Test (timeout=60000) + public void testDecreaseReplicationBeforeCommitting() throws Exception { + testQuotaIssuesBeforeCommitting((short)4, (short)1); + } + + @Test (timeout=60000) + public void testIncreaseReplicationBeforeCommitting() throws Exception { + testQuotaIssuesBeforeCommitting((short)1, (short)4); + } }