HDFS-10843. Update space quota when a UC block is completed rather than committed. Contributed by Erik Krogen.
This commit is contained in:
parent
bbdf350ff9
commit
a5bb88c8e0
|
@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
|
@ -786,12 +787,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
*
|
||||
* @param bc block collection
|
||||
* @param commitBlock - contains client reported block length and generation
|
||||
* @param iip - INodes in path to bc
|
||||
* @return true if the last block is changed to committed state.
|
||||
* @throws IOException if the block does not have at least a minimal number
|
||||
* of replicas reported from data-nodes.
|
||||
*/
|
||||
public boolean commitOrCompleteLastBlock(BlockCollection bc,
|
||||
Block commitBlock) throws IOException {
|
||||
Block commitBlock, INodesInPath iip) throws IOException {
|
||||
if(commitBlock == null)
|
||||
return false; // not committing, this is a block allocation retry
|
||||
BlockInfo lastBlock = bc.getLastBlock();
|
||||
|
@ -811,7 +813,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
if (committed) {
|
||||
addExpectedReplicasToPending(lastBlock);
|
||||
}
|
||||
completeBlock(lastBlock, false);
|
||||
completeBlock(lastBlock, iip, false);
|
||||
}
|
||||
return committed;
|
||||
}
|
||||
|
@ -841,11 +843,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
/**
|
||||
* Convert a specified block of the file to a complete block.
|
||||
* @param curBlock - block to be completed
|
||||
* @param iip - INodes in path to file containing curBlock; if null,
|
||||
* this will be resolved internally
|
||||
* @param force - force completion of the block
|
||||
* @throws IOException if the block does not have at least a minimal number
|
||||
* of replicas reported from data-nodes.
|
||||
*/
|
||||
private void completeBlock(BlockInfo curBlock, boolean force)
|
||||
throws IOException {
|
||||
private void completeBlock(BlockInfo curBlock, INodesInPath iip,
|
||||
boolean force) throws IOException {
|
||||
if (curBlock.isComplete()) {
|
||||
return;
|
||||
}
|
||||
|
@ -860,7 +866,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
"Cannot complete block: block has not been COMMITTED by the client");
|
||||
}
|
||||
|
||||
curBlock.convertToCompleteBlock();
|
||||
convertToCompleteBlock(curBlock, iip);
|
||||
|
||||
// Since safe-mode only counts complete blocks, and we now have
|
||||
// one more complete block, we need to adjust the total up, and
|
||||
// also count it as safe, if we have at least the minimum replica
|
||||
|
@ -874,6 +881,22 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
curBlock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a specified block of the file to a complete block.
|
||||
* Skips validity checking and safe mode block total updates; use
|
||||
* {@link BlockManager#completeBlock} to include these.
|
||||
* @param curBlock - block to be completed
|
||||
* @param iip - INodes in path to file containing curBlock; if null,
|
||||
* this will be resolved internally
|
||||
* @throws IOException if the block does not have at least a minimal number
|
||||
* of replicas reported from data-nodes.
|
||||
*/
|
||||
private void convertToCompleteBlock(BlockInfo curBlock, INodesInPath iip)
|
||||
throws IOException {
|
||||
curBlock.convertToCompleteBlock();
|
||||
namesystem.getFSDirectory().updateSpaceForCompleteBlock(curBlock, iip);
|
||||
}
|
||||
|
||||
/**
|
||||
* Force the given block in the given file to be marked as complete,
|
||||
* regardless of whether enough replicas are present. This is necessary
|
||||
|
@ -881,7 +904,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
*/
|
||||
public void forceCompleteBlock(final BlockInfo block) throws IOException {
|
||||
block.commitBlock(block);
|
||||
completeBlock(block, true);
|
||||
completeBlock(block, null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2910,7 +2933,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
int numCurrentReplica = countLiveNodes(storedBlock);
|
||||
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
||||
&& hasMinStorage(storedBlock, numCurrentReplica)) {
|
||||
completeBlock(storedBlock, false);
|
||||
completeBlock(storedBlock, null, false);
|
||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||
// check whether safe replication is reached for the block
|
||||
// only complete blocks are counted towards that.
|
||||
|
@ -2985,7 +3008,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||
hasMinStorage(storedBlock, numLiveReplicas)) {
|
||||
addExpectedReplicasToPending(storedBlock);
|
||||
completeBlock(storedBlock, false);
|
||||
completeBlock(storedBlock, null, false);
|
||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||
// check whether safe replication is reached for the block
|
||||
// only complete blocks are counted towards that
|
||||
|
|
|
@ -418,9 +418,9 @@ public class FSDirAttrOp {
|
|||
// Make sure the directory has sufficient quotas
|
||||
short oldBR = file.getPreferredBlockReplication();
|
||||
|
||||
long size = file.computeFileSize(true, true);
|
||||
// Ensure the quota does not exceed
|
||||
if (oldBR < replication) {
|
||||
long size = file.computeFileSize(true, true);
|
||||
fsd.updateCount(iip, 0L, size, oldBR, replication, true);
|
||||
}
|
||||
|
||||
|
@ -428,14 +428,10 @@ public class FSDirAttrOp {
|
|||
short targetReplication = (short) Math.max(
|
||||
replication, file.getPreferredBlockReplication());
|
||||
|
||||
if (oldBR > replication) {
|
||||
fsd.updateCount(iip, 0L, size, oldBR, targetReplication, true);
|
||||
}
|
||||
for (BlockInfo b : file.getBlocks()) {
|
||||
if (oldBR == targetReplication) {
|
||||
continue;
|
||||
}
|
||||
if (oldBR > replication) {
|
||||
fsd.updateCount(iip, 0L, b.getNumBytes(), oldBR, targetReplication,
|
||||
true);
|
||||
}
|
||||
bm.setReplication(oldBR, targetReplication, b);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
|
@ -930,6 +932,51 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the cached quota space for a block that is being completed.
|
||||
* Must only be called once, as the block is being completed.
|
||||
* @param completeBlk - Completed block for which to update space
|
||||
* @param inodes - INodes in path to file containing completeBlk; if null
|
||||
* this will be resolved internally
|
||||
*/
|
||||
public void updateSpaceForCompleteBlock(BlockInfo completeBlk,
|
||||
INodesInPath inodes) throws IOException {
|
||||
assert namesystem.hasWriteLock();
|
||||
INodesInPath iip = inodes != null ? inodes :
|
||||
INodesInPath.fromINode(namesystem.getBlockCollection(completeBlk));
|
||||
INodeFile fileINode = iip.getLastINode().asFile();
|
||||
// Adjust disk space consumption if required
|
||||
final long diff;
|
||||
final short replicationFactor;
|
||||
if (fileINode.isStriped()) {
|
||||
final ErasureCodingPolicy ecPolicy =
|
||||
FSDirErasureCodingOp.getErasureCodingPolicy(namesystem, iip);
|
||||
final short numDataUnits = (short) ecPolicy.getNumDataUnits();
|
||||
final short numParityUnits = (short) ecPolicy.getNumParityUnits();
|
||||
|
||||
final long numBlocks = numDataUnits + numParityUnits;
|
||||
final long fullBlockGroupSize =
|
||||
fileINode.getPreferredBlockSize() * numBlocks;
|
||||
|
||||
final BlockInfoStriped striped =
|
||||
new BlockInfoStriped(completeBlk, ecPolicy);
|
||||
final long actualBlockGroupSize = striped.spaceConsumed();
|
||||
|
||||
diff = fullBlockGroupSize - actualBlockGroupSize;
|
||||
replicationFactor = (short) 1;
|
||||
} else {
|
||||
diff = fileINode.getPreferredBlockSize() - completeBlk.getNumBytes();
|
||||
replicationFactor = fileINode.getFileReplication();
|
||||
}
|
||||
if (diff > 0) {
|
||||
try {
|
||||
updateSpaceConsumed(iip, 0, -diff, replicationFactor);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unexpected exception while updating disk space.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public EnumCounters<StorageType> getStorageTypeDeltas(byte storagePolicyID,
|
||||
long dsDelta, short oldRep, short newRep) {
|
||||
EnumCounters<StorageType> typeSpaceDeltas =
|
||||
|
|
|
@ -186,7 +186,6 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
|||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
|
@ -207,7 +206,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
|
|||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
|
@ -3292,40 +3290,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
final Block commitBlock) throws IOException {
|
||||
assert hasWriteLock();
|
||||
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
||||
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Adjust disk space consumption if required
|
||||
final long diff;
|
||||
final short replicationFactor;
|
||||
if (fileINode.isStriped()) {
|
||||
final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
|
||||
.getErasureCodingPolicy(this, iip);
|
||||
final short numDataUnits = (short) ecPolicy.getNumDataUnits();
|
||||
final short numParityUnits = (short) ecPolicy.getNumParityUnits();
|
||||
|
||||
final long numBlocks = numDataUnits + numParityUnits;
|
||||
final long fullBlockGroupSize =
|
||||
fileINode.getPreferredBlockSize() * numBlocks;
|
||||
|
||||
final BlockInfoStriped striped = new BlockInfoStriped(commitBlock,
|
||||
ecPolicy);
|
||||
final long actualBlockGroupSize = striped.spaceConsumed();
|
||||
|
||||
diff = fullBlockGroupSize - actualBlockGroupSize;
|
||||
replicationFactor = (short) 1;
|
||||
} else {
|
||||
diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
|
||||
replicationFactor = fileINode.getFileReplication();
|
||||
}
|
||||
if (diff > 0) {
|
||||
try {
|
||||
dir.updateSpaceConsumed(iip, 0, -diff, replicationFactor);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unexpected exception while updating disk space.", e);
|
||||
}
|
||||
}
|
||||
blockManager.commitOrCompleteLastBlock(fileINode, commitBlock, iip);
|
||||
}
|
||||
|
||||
void addCommittedBlocksToPending(final INodeFile pendingFile) {
|
||||
|
@ -3583,7 +3548,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
/**
|
||||
* @param pendingFile open file that needs to be closed
|
||||
* @param storedBlock last block
|
||||
* @return Path of the file that was closed.
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@VisibleForTesting
|
||||
|
@ -5734,6 +5698,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
|
||||
/** @return the FSDirectory. */
|
||||
@Override
|
||||
public FSDirectory getFSDirectory() {
|
||||
return dir;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ public interface Namesystem extends RwLock, SafeMode {
|
|||
|
||||
BlockCollection getBlockCollection(long id);
|
||||
|
||||
FSDirectory getFSDirectory();
|
||||
|
||||
void startSecretManagerIfNecessary();
|
||||
|
||||
boolean isInSnapshot(long blockCollectionID);
|
||||
|
|
|
@ -18,10 +18,14 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
|
@ -33,61 +37,79 @@ import org.apache.hadoop.hdfs.DFSOutputStream;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.TestFileCreation;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.junit.After;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestDiskspaceQuotaUpdate {
|
||||
private static final int BLOCKSIZE = 1024;
|
||||
private static final short REPLICATION = 4;
|
||||
static final long seed = 0L;
|
||||
private static final Path dir = new Path("/TestQuotaUpdate");
|
||||
private static final Path BASE_DIR = new Path("/TestQuotaUpdate");
|
||||
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private FSDirectory fsdir;
|
||||
private DistributedFileSystem dfs;
|
||||
private static Configuration conf;
|
||||
private static MiniDFSCluster cluster;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
|
||||
fsdir = cluster.getNamesystem().getFSDirectory();
|
||||
dfs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
private Path getParent(String testName) {
|
||||
return new Path(BASE_DIR, testName);
|
||||
}
|
||||
|
||||
private FSDirectory getFSDirectory() {
|
||||
return cluster.getNamesystem().getFSDirectory();
|
||||
}
|
||||
|
||||
private DistributedFileSystem getDFS() throws IOException {
|
||||
return cluster.getFileSystem();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the quota can be correctly updated for create file
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testQuotaUpdateWithFileCreate() throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path foo =
|
||||
new Path(getParent(GenericTestUtils.getMethodName()), "foo");
|
||||
Path createdFile = new Path(foo, "created_file.data");
|
||||
dfs.mkdirs(foo);
|
||||
dfs.setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
||||
getDFS().mkdirs(foo);
|
||||
getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
||||
long fileLen = BLOCKSIZE * 2 + BLOCKSIZE / 2;
|
||||
DFSTestUtil.createFile(dfs, createdFile, BLOCKSIZE / 16,
|
||||
DFSTestUtil.createFile(getDFS(), createdFile, BLOCKSIZE / 16,
|
||||
fileLen, BLOCKSIZE, REPLICATION, seed);
|
||||
INode fnode = fsdir.getINode4Write(foo.toString());
|
||||
INode fnode = getFSDirectory().getINode4Write(foo.toString());
|
||||
assertTrue(fnode.isDirectory());
|
||||
assertTrue(fnode.isQuotaSet());
|
||||
QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
|
||||
|
@ -101,18 +123,20 @@ public class TestDiskspaceQuotaUpdate {
|
|||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testUpdateQuotaForAppend() throws Exception {
|
||||
final Path foo = new Path(dir ,"foo");
|
||||
final Path foo =
|
||||
new Path(getParent(GenericTestUtils.getMethodName()), "foo");
|
||||
final Path bar = new Path(foo, "bar");
|
||||
long currentFileLen = BLOCKSIZE;
|
||||
DFSTestUtil.createFile(dfs, bar, currentFileLen, REPLICATION, seed);
|
||||
dfs.setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
||||
DFSTestUtil.createFile(getDFS(), bar, currentFileLen, REPLICATION, seed);
|
||||
getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
||||
|
||||
// append half of the block data, the previous file length is at block
|
||||
// boundary
|
||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE / 2);
|
||||
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE / 2);
|
||||
currentFileLen += (BLOCKSIZE / 2);
|
||||
|
||||
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
||||
INodeDirectory fooNode =
|
||||
getFSDirectory().getINode4Write(foo.toString()).asDirectory();
|
||||
assertTrue(fooNode.isQuotaSet());
|
||||
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed();
|
||||
|
@ -120,11 +144,11 @@ public class TestDiskspaceQuotaUpdate {
|
|||
long ds = quota.getStorageSpace();
|
||||
assertEquals(2, ns); // foo and bar
|
||||
assertEquals(currentFileLen * REPLICATION, ds);
|
||||
ContentSummary c = dfs.getContentSummary(foo);
|
||||
ContentSummary c = getDFS().getContentSummary(foo);
|
||||
assertEquals(c.getSpaceConsumed(), ds);
|
||||
|
||||
// append another block, the previous file length is not at block boundary
|
||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE);
|
||||
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE);
|
||||
currentFileLen += BLOCKSIZE;
|
||||
|
||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||
|
@ -132,11 +156,11 @@ public class TestDiskspaceQuotaUpdate {
|
|||
ds = quota.getStorageSpace();
|
||||
assertEquals(2, ns); // foo and bar
|
||||
assertEquals(currentFileLen * REPLICATION, ds);
|
||||
c = dfs.getContentSummary(foo);
|
||||
c = getDFS().getContentSummary(foo);
|
||||
assertEquals(c.getSpaceConsumed(), ds);
|
||||
|
||||
// append several blocks
|
||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
||||
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
||||
currentFileLen += (BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
||||
|
||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||
|
@ -144,7 +168,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
ds = quota.getStorageSpace();
|
||||
assertEquals(2, ns); // foo and bar
|
||||
assertEquals(currentFileLen * REPLICATION, ds);
|
||||
c = dfs.getContentSummary(foo);
|
||||
c = getDFS().getContentSummary(foo);
|
||||
assertEquals(c.getSpaceConsumed(), ds);
|
||||
}
|
||||
|
||||
|
@ -154,16 +178,18 @@ public class TestDiskspaceQuotaUpdate {
|
|||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testUpdateQuotaForFSync() throws Exception {
|
||||
final Path foo = new Path("/foo");
|
||||
final Path foo =
|
||||
new Path(getParent(GenericTestUtils.getMethodName()), "foo");
|
||||
final Path bar = new Path(foo, "bar");
|
||||
DFSTestUtil.createFile(dfs, bar, BLOCKSIZE, REPLICATION, 0L);
|
||||
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||
DFSTestUtil.createFile(getDFS(), bar, BLOCKSIZE, REPLICATION, 0L);
|
||||
getDFS().setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||
|
||||
FSDataOutputStream out = dfs.append(bar);
|
||||
FSDataOutputStream out = getDFS().append(bar);
|
||||
out.write(new byte[BLOCKSIZE / 4]);
|
||||
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
||||
|
||||
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
||||
INodeDirectory fooNode =
|
||||
getFSDirectory().getINode4Write(foo.toString()).asDirectory();
|
||||
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed();
|
||||
long ns = quota.getNameSpace();
|
||||
|
@ -174,7 +200,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
out.write(new byte[BLOCKSIZE / 4]);
|
||||
out.close();
|
||||
|
||||
fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
||||
fooNode = getFSDirectory().getINode4Write(foo.toString()).asDirectory();
|
||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||
ns = quota.getNameSpace();
|
||||
ds = quota.getStorageSpace();
|
||||
|
@ -182,7 +208,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds);
|
||||
|
||||
// append another block
|
||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE);
|
||||
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE);
|
||||
|
||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||
ns = quota.getNameSpace();
|
||||
|
@ -196,21 +222,21 @@ public class TestDiskspaceQuotaUpdate {
|
|||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testAppendOverStorageQuota() throws Exception {
|
||||
final Path dir = new Path("/TestAppendOverQuota");
|
||||
final Path dir = getParent(GenericTestUtils.getMethodName());
|
||||
final Path file = new Path(dir, "file");
|
||||
|
||||
// create partial block file
|
||||
dfs.mkdirs(dir);
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
getDFS().mkdirs(dir);
|
||||
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
|
||||
// lower quota to cause exception when appending to partial block
|
||||
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||
.asDirectory();
|
||||
getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||
final INodeDirectory dirNode =
|
||||
getFSDirectory().getINode4Write(dir.toString()).asDirectory();
|
||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
try {
|
||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||
DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE);
|
||||
Assert.fail("append didn't fail");
|
||||
} catch (DSQuotaExceededException e) {
|
||||
// ignore
|
||||
|
@ -218,7 +244,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
|
||||
LeaseManager lm = cluster.getNamesystem().getLeaseManager();
|
||||
// check that the file exists, isn't UC, and has no dangling lease
|
||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||
INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
|
||||
Assert.assertNotNull(inode);
|
||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||
Assert.assertNull("should not have a lease", lm.getLease(inode));
|
||||
|
@ -227,7 +253,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
.getSpaceConsumed().getStorageSpace();
|
||||
assertEquals(spaceUsed, newSpaceUsed);
|
||||
// make sure edits aren't corrupted
|
||||
dfs.recoverLease(file);
|
||||
getDFS().recoverLease(file);
|
||||
cluster.restartNameNodes();
|
||||
}
|
||||
|
||||
|
@ -237,23 +263,23 @@ public class TestDiskspaceQuotaUpdate {
|
|||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testAppendOverTypeQuota() throws Exception {
|
||||
final Path dir = new Path("/TestAppendOverTypeQuota");
|
||||
final Path dir = getParent(GenericTestUtils.getMethodName());
|
||||
final Path file = new Path(dir, "file");
|
||||
|
||||
// create partial block file
|
||||
dfs.mkdirs(dir);
|
||||
getDFS().mkdirs(dir);
|
||||
// set the storage policy on dir
|
||||
dfs.setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
getDFS().setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
||||
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
|
||||
// set quota of SSD to 1L
|
||||
dfs.setQuotaByStorageType(dir, StorageType.SSD, 1L);
|
||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||
.asDirectory();
|
||||
getDFS().setQuotaByStorageType(dir, StorageType.SSD, 1L);
|
||||
final INodeDirectory dirNode =
|
||||
getFSDirectory().getINode4Write(dir.toString()).asDirectory();
|
||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
try {
|
||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||
DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE);
|
||||
Assert.fail("append didn't fail");
|
||||
} catch (QuotaByStorageTypeExceededException e) {
|
||||
//ignore
|
||||
|
@ -261,7 +287,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
|
||||
// check that the file exists, isn't UC, and has no dangling lease
|
||||
LeaseManager lm = cluster.getNamesystem().getLeaseManager();
|
||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||
INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
|
||||
Assert.assertNotNull(inode);
|
||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||
Assert.assertNull("should not have a lease", lm.getLease(inode));
|
||||
|
@ -270,7 +296,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
.getSpaceConsumed().getStorageSpace();
|
||||
assertEquals(spaceUsed, newSpaceUsed);
|
||||
// make sure edits aren't corrupted
|
||||
dfs.recoverLease(file);
|
||||
getDFS().recoverLease(file);
|
||||
cluster.restartNameNodes();
|
||||
}
|
||||
|
||||
|
@ -279,21 +305,21 @@ public class TestDiskspaceQuotaUpdate {
|
|||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testTruncateOverQuota() throws Exception {
|
||||
final Path dir = new Path("/TestTruncateOverquota");
|
||||
final Path dir = getParent(GenericTestUtils.getMethodName());
|
||||
final Path file = new Path(dir, "file");
|
||||
|
||||
// create partial block file
|
||||
dfs.mkdirs(dir);
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
getDFS().mkdirs(dir);
|
||||
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
|
||||
// lower quota to cause exception when appending to partial block
|
||||
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||
.asDirectory();
|
||||
getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||
final INodeDirectory dirNode =
|
||||
getFSDirectory().getINode4Write(dir.toString()).asDirectory();
|
||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
try {
|
||||
dfs.truncate(file, BLOCKSIZE / 2 - 1);
|
||||
getDFS().truncate(file, BLOCKSIZE / 2 - 1);
|
||||
Assert.fail("truncate didn't fail");
|
||||
} catch (RemoteException e) {
|
||||
assertTrue(e.getClassName().contains("DSQuotaExceededException"));
|
||||
|
@ -301,7 +327,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
|
||||
// check that the file exists, isn't UC, and has no dangling lease
|
||||
LeaseManager lm = cluster.getNamesystem().getLeaseManager();
|
||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||
INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
|
||||
Assert.assertNotNull(inode);
|
||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||
Assert.assertNull("should not have a lease", lm.getLease(inode));
|
||||
|
@ -310,7 +336,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
.getSpaceConsumed().getStorageSpace();
|
||||
assertEquals(spaceUsed, newSpaceUsed);
|
||||
// make sure edits aren't corrupted
|
||||
dfs.recoverLease(file);
|
||||
getDFS().recoverLease(file);
|
||||
cluster.restartNameNodes();
|
||||
}
|
||||
|
||||
|
@ -320,33 +346,34 @@ public class TestDiskspaceQuotaUpdate {
|
|||
@Test
|
||||
public void testQuotaInitialization() throws Exception {
|
||||
final int size = 500;
|
||||
Path testDir = new Path("/testDir");
|
||||
Path testDir =
|
||||
new Path(getParent(GenericTestUtils.getMethodName()), "testDir");
|
||||
long expectedSize = 3 * BLOCKSIZE + BLOCKSIZE/2;
|
||||
dfs.mkdirs(testDir);
|
||||
dfs.setQuota(testDir, size*4, expectedSize*size*2);
|
||||
getDFS().mkdirs(testDir);
|
||||
getDFS().setQuota(testDir, size*4, expectedSize*size*2);
|
||||
|
||||
Path[] testDirs = new Path[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
testDirs[i] = new Path(testDir, "sub" + i);
|
||||
dfs.mkdirs(testDirs[i]);
|
||||
dfs.setQuota(testDirs[i], 100, 1000000);
|
||||
DFSTestUtil.createFile(dfs, new Path(testDirs[i], "a"), expectedSize,
|
||||
getDFS().mkdirs(testDirs[i]);
|
||||
getDFS().setQuota(testDirs[i], 100, 1000000);
|
||||
DFSTestUtil.createFile(getDFS(), new Path(testDirs[i], "a"), expectedSize,
|
||||
(short)1, 1L);
|
||||
}
|
||||
|
||||
// Directly access the name system to obtain the current cached usage.
|
||||
INodeDirectory root = fsdir.getRoot();
|
||||
INodeDirectory root = getFSDirectory().getRoot();
|
||||
HashMap<String, Long> nsMap = new HashMap<String, Long>();
|
||||
HashMap<String, Long> dsMap = new HashMap<String, Long>();
|
||||
scanDirsWithQuota(root, nsMap, dsMap, false);
|
||||
|
||||
fsdir.updateCountForQuota(1);
|
||||
getFSDirectory().updateCountForQuota(1);
|
||||
scanDirsWithQuota(root, nsMap, dsMap, true);
|
||||
|
||||
fsdir.updateCountForQuota(2);
|
||||
getFSDirectory().updateCountForQuota(2);
|
||||
scanDirsWithQuota(root, nsMap, dsMap, true);
|
||||
|
||||
fsdir.updateCountForQuota(4);
|
||||
getFSDirectory().updateCountForQuota(4);
|
||||
scanDirsWithQuota(root, nsMap, dsMap, true);
|
||||
}
|
||||
|
||||
|
@ -372,4 +399,130 @@ public class TestDiskspaceQuotaUpdate {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the cached quota stays correct between the COMMIT
|
||||
* and COMPLETE block steps, even if the replication factor is
|
||||
* changed during this time.
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testQuotaIssuesWhileCommitting() throws Exception {
|
||||
// We want a one-DN cluster so that we can force a lack of
|
||||
// commit by only instrumenting a single DN; we kill the other 3
|
||||
List<MiniDFSCluster.DataNodeProperties> dnprops = new ArrayList<>();
|
||||
try {
|
||||
for (int i = REPLICATION - 1; i > 0; i--) {
|
||||
dnprops.add(cluster.stopDataNode(i));
|
||||
}
|
||||
|
||||
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||
InternalDataNodeTestUtils.spyOnBposToNN(
|
||||
cluster.getDataNodes().get(0), cluster.getNameNode());
|
||||
|
||||
testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 4);
|
||||
testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 4, (short) 1);
|
||||
|
||||
// Don't actually change replication; just check that the sizes
|
||||
// agree during the commit period
|
||||
testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 1);
|
||||
} finally {
|
||||
for (MiniDFSCluster.DataNodeProperties dnprop : dnprops) {
|
||||
cluster.restartDataNode(dnprop, true);
|
||||
}
|
||||
cluster.waitActive();
|
||||
}
|
||||
}
|
||||
|
||||
private void testQuotaIssuesWhileCommittingHelper(
|
||||
DatanodeProtocolClientSideTranslatorPB nnSpy,
|
||||
final short initialReplication, final short finalReplication)
|
||||
throws Exception {
|
||||
final String logStmt =
|
||||
"BUG: Inconsistent storagespace for directory";
|
||||
final Path dir = new Path(getParent(GenericTestUtils.getMethodName()),
|
||||
String.format("%d-%d", initialReplication, finalReplication));
|
||||
final Path file = new Path(dir, "testfile");
|
||||
|
||||
LogCapturer logs = LogCapturer.captureLogs(NameNode.LOG);
|
||||
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
if (finalReplication != initialReplication) {
|
||||
getDFS().setReplication(file, finalReplication);
|
||||
}
|
||||
// Call getContentSummary before the DN can notify the NN
|
||||
// that the block has been received to check for discrepancy
|
||||
getDFS().getContentSummary(dir);
|
||||
invocation.callRealMethod();
|
||||
return null;
|
||||
}
|
||||
}).when(nnSpy).blockReceivedAndDeleted(
|
||||
Mockito.<DatanodeRegistration>anyObject(),
|
||||
Mockito.anyString(),
|
||||
Mockito.<StorageReceivedDeletedBlocks[]>anyObject()
|
||||
);
|
||||
|
||||
getDFS().mkdirs(dir);
|
||||
getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||
|
||||
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, initialReplication, 1L);
|
||||
|
||||
// Also check for discrepancy after completing the file
|
||||
getDFS().getContentSummary(dir);
|
||||
assertFalse(logs.getOutput().contains(logStmt));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the cached quota remains correct when the block has been
|
||||
* written to but not yet committed, even if the replication factor
|
||||
* is updated during this time.
|
||||
*/
|
||||
private void testQuotaIssuesBeforeCommitting(short initialReplication,
|
||||
short finalReplication) throws Exception {
|
||||
final String logStmt =
|
||||
"BUG: Inconsistent storagespace for directory";
|
||||
final Path dir = new Path(getParent(GenericTestUtils.getMethodName()),
|
||||
String.format("%d-%d", initialReplication, finalReplication));
|
||||
final Path file = new Path(dir, "testfile");
|
||||
|
||||
LogCapturer logs = LogCapturer.captureLogs(NameNode.LOG);
|
||||
|
||||
getDFS().mkdirs(dir);
|
||||
getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||
|
||||
FSDataOutputStream out =
|
||||
TestFileCreation.createFile(getDFS(), file, initialReplication);
|
||||
TestFileCreation.writeFile(out, BLOCKSIZE / 2);
|
||||
out.hflush();
|
||||
|
||||
getDFS().getContentSummary(dir);
|
||||
if (finalReplication != initialReplication) {
|
||||
// While the block is visible to the NN but has not yet been committed,
|
||||
// change the replication
|
||||
getDFS().setReplication(file, finalReplication);
|
||||
}
|
||||
|
||||
out.close();
|
||||
|
||||
getDFS().getContentSummary(dir);
|
||||
assertFalse(logs.getOutput().contains(logStmt));
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testCachedComputedSizesAgreeBeforeCommitting() throws Exception {
|
||||
// Don't actually change replication; just check that the sizes
|
||||
// agree before the commit period
|
||||
testQuotaIssuesBeforeCommitting((short)1, (short)1);
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testDecreaseReplicationBeforeCommitting() throws Exception {
|
||||
testQuotaIssuesBeforeCommitting((short)4, (short)1);
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testIncreaseReplicationBeforeCommitting() throws Exception {
|
||||
testQuotaIssuesBeforeCommitting((short)1, (short)4);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue