HDFS-10843. Update space quota when a UC block is completed rather than committed. Contributed by Erik Krogen.
(cherry picked from commit a5bb88c8e0
)
This commit is contained in:
parent
2a1e48bd2f
commit
4b5806217e
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
|
@ -622,12 +623,13 @@ public class BlockManager {
|
||||||
*
|
*
|
||||||
* @param bc block collection
|
* @param bc block collection
|
||||||
* @param commitBlock - contains client reported block length and generation
|
* @param commitBlock - contains client reported block length and generation
|
||||||
|
* @param iip - INodes in path to bc
|
||||||
* @return true if the last block is changed to committed state.
|
* @return true if the last block is changed to committed state.
|
||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
public boolean commitOrCompleteLastBlock(BlockCollection bc,
|
public boolean commitOrCompleteLastBlock(BlockCollection bc,
|
||||||
Block commitBlock) throws IOException {
|
Block commitBlock, INodesInPath iip) throws IOException {
|
||||||
if(commitBlock == null)
|
if(commitBlock == null)
|
||||||
return false; // not committing, this is a block allocation retry
|
return false; // not committing, this is a block allocation retry
|
||||||
BlockInfoContiguous lastBlock = bc.getLastBlock();
|
BlockInfoContiguous lastBlock = bc.getLastBlock();
|
||||||
|
@ -639,7 +641,7 @@ public class BlockManager {
|
||||||
final boolean b = commitBlock(
|
final boolean b = commitBlock(
|
||||||
(BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
|
(BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
|
||||||
if(countNodes(lastBlock).liveReplicas() >= minReplication)
|
if(countNodes(lastBlock).liveReplicas() >= minReplication)
|
||||||
completeBlock(bc, bc.numBlocks()-1, false);
|
completeBlock(bc, bc.numBlocks()-1, iip, false);
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -647,11 +649,13 @@ public class BlockManager {
|
||||||
* Convert a specified block of the file to a complete block.
|
* Convert a specified block of the file to a complete block.
|
||||||
* @param bc file
|
* @param bc file
|
||||||
* @param blkIndex block index in the file
|
* @param blkIndex block index in the file
|
||||||
|
* @param iip - INodes in path to file containing curBlock; if null,
|
||||||
|
* this will be resolved internally
|
||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
private BlockInfoContiguous completeBlock(final BlockCollection bc,
|
private BlockInfoContiguous completeBlock(final BlockCollection bc,
|
||||||
final int blkIndex, boolean force) throws IOException {
|
final int blkIndex, INodesInPath iip, boolean force) throws IOException {
|
||||||
if(blkIndex < 0)
|
if(blkIndex < 0)
|
||||||
return null;
|
return null;
|
||||||
BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
|
BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
|
||||||
|
@ -666,7 +670,7 @@ public class BlockManager {
|
||||||
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
|
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Cannot complete block: block has not been COMMITTED by the client");
|
"Cannot complete block: block has not been COMMITTED by the client");
|
||||||
BlockInfoContiguous completeBlock = ucBlock.convertToCompleteBlock();
|
BlockInfoContiguous completeBlock = convertToCompleteBlock(ucBlock, iip);
|
||||||
// replace penultimate block in file
|
// replace penultimate block in file
|
||||||
bc.setBlock(blkIndex, completeBlock);
|
bc.setBlock(blkIndex, completeBlock);
|
||||||
|
|
||||||
|
@ -685,15 +689,34 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockInfoContiguous completeBlock(final BlockCollection bc,
|
private BlockInfoContiguous completeBlock(final BlockCollection bc,
|
||||||
final BlockInfoContiguous block, boolean force) throws IOException {
|
final BlockInfoContiguous block, INodesInPath iip, boolean force)
|
||||||
|
throws IOException {
|
||||||
BlockInfoContiguous[] fileBlocks = bc.getBlocks();
|
BlockInfoContiguous[] fileBlocks = bc.getBlocks();
|
||||||
for(int idx = 0; idx < fileBlocks.length; idx++)
|
for(int idx = 0; idx < fileBlocks.length; idx++)
|
||||||
if(fileBlocks[idx] == block) {
|
if(fileBlocks[idx] == block) {
|
||||||
return completeBlock(bc, idx, force);
|
return completeBlock(bc, idx, iip, force);
|
||||||
}
|
}
|
||||||
return block;
|
return block;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a specified block of the file to a complete block.
|
||||||
|
* Skips validity checking and safe mode block total updates; use
|
||||||
|
* {@link BlockManager#completeBlock} to include these.
|
||||||
|
* @param curBlock - block to be completed
|
||||||
|
* @param iip - INodes in path to file containing curBlock; if null,
|
||||||
|
* this will be resolved internally
|
||||||
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
|
* of replicas reported from data-nodes.
|
||||||
|
*/
|
||||||
|
private BlockInfoContiguous convertToCompleteBlock(
|
||||||
|
BlockInfoContiguousUnderConstruction curBlock, INodesInPath iip)
|
||||||
|
throws IOException {
|
||||||
|
BlockInfoContiguous complete = curBlock.convertToCompleteBlock();
|
||||||
|
namesystem.getFSDirectory().updateSpaceForCompleteBlock(curBlock, iip);
|
||||||
|
return complete;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force the given block in the given file to be marked as complete,
|
* Force the given block in the given file to be marked as complete,
|
||||||
* regardless of whether enough replicas are present. This is necessary
|
* regardless of whether enough replicas are present. This is necessary
|
||||||
|
@ -702,7 +725,7 @@ public class BlockManager {
|
||||||
public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc,
|
public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc,
|
||||||
final BlockInfoContiguousUnderConstruction block) throws IOException {
|
final BlockInfoContiguousUnderConstruction block) throws IOException {
|
||||||
block.commitBlock(block);
|
block.commitBlock(block);
|
||||||
return completeBlock(bc, block, true);
|
return completeBlock(bc, block, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2525,7 +2548,7 @@ public class BlockManager {
|
||||||
int numCurrentReplica = countLiveNodes(storedBlock);
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
||||||
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
||||||
&& numCurrentReplica >= minReplication) {
|
&& numCurrentReplica >= minReplication) {
|
||||||
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
|
completeBlock(storedBlock.getBlockCollection(), storedBlock, null, false);
|
||||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||||
// check whether safe replication is reached for the block
|
// check whether safe replication is reached for the block
|
||||||
// only complete blocks are counted towards that.
|
// only complete blocks are counted towards that.
|
||||||
|
@ -2599,7 +2622,7 @@ public class BlockManager {
|
||||||
|
|
||||||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||||
numLiveReplicas >= minReplication) {
|
numLiveReplicas >= minReplication) {
|
||||||
storedBlock = completeBlock(bc, storedBlock, false);
|
storedBlock = completeBlock(bc, storedBlock, null, false);
|
||||||
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
||||||
// check whether safe replication is reached for the block
|
// check whether safe replication is reached for the block
|
||||||
// only complete blocks are counted towards that
|
// only complete blocks are counted towards that
|
||||||
|
|
|
@ -382,13 +382,13 @@ public class FSDirAttrOp {
|
||||||
}
|
}
|
||||||
INodeFile file = inode.asFile();
|
INodeFile file = inode.asFile();
|
||||||
final short oldBR = file.getBlockReplication();
|
final short oldBR = file.getBlockReplication();
|
||||||
|
long size = file.computeFileSize(true, true);
|
||||||
|
|
||||||
// before setFileReplication, check for increasing block replication.
|
// before setFileReplication, check for increasing block replication.
|
||||||
// if replication > oldBR, then newBR == replication.
|
// if replication > oldBR, then newBR == replication.
|
||||||
// if replication < oldBR, we don't know newBR yet.
|
// if replication < oldBR, we don't know newBR yet.
|
||||||
if (replication > oldBR) {
|
if (replication > oldBR) {
|
||||||
long dsDelta = file.storagespaceConsumed()/oldBR;
|
fsd.updateCount(iip, 0L, size, oldBR, replication, true);
|
||||||
fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
file.setFileReplication(replication, iip.getLatestSnapshotId());
|
file.setFileReplication(replication, iip.getLatestSnapshotId());
|
||||||
|
@ -396,8 +396,7 @@ public class FSDirAttrOp {
|
||||||
final short newBR = file.getBlockReplication();
|
final short newBR = file.getBlockReplication();
|
||||||
// check newBR < oldBR case.
|
// check newBR < oldBR case.
|
||||||
if (newBR < oldBR) {
|
if (newBR < oldBR) {
|
||||||
long dsDelta = file.storagespaceConsumed()/newBR;
|
fsd.updateCount(iip, 0L, size, oldBR, newBR, true);
|
||||||
fsd.updateCount(iip, 0L, dsDelta, oldBR, newBR, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (blockRepls != null) {
|
if (blockRepls != null) {
|
||||||
|
|
|
@ -737,6 +737,31 @@ public class FSDirectory implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the cached quota space for a block that is being completed.
|
||||||
|
* Must only be called once, as the block is being completed.
|
||||||
|
* @param completeBlk - Completed block for which to update space
|
||||||
|
* @param inodes - INodes in path to file containing completeBlk; if null
|
||||||
|
* this will be resolved internally
|
||||||
|
*/
|
||||||
|
public void updateSpaceForCompleteBlock(BlockInfoContiguous completeBlk,
|
||||||
|
INodesInPath inodes) throws IOException {
|
||||||
|
assert namesystem.hasWriteLock();
|
||||||
|
INodesInPath iip = inodes != null ? inodes :
|
||||||
|
INodesInPath.fromINode((INodeFile) completeBlk.getBlockCollection());
|
||||||
|
INodeFile fileINode = iip.getLastINode().asFile();
|
||||||
|
// Adjust disk space consumption if required
|
||||||
|
final long diff =
|
||||||
|
fileINode.getPreferredBlockSize() - completeBlk.getNumBytes();
|
||||||
|
if (diff > 0) {
|
||||||
|
try {
|
||||||
|
updateSpaceConsumed(iip, 0, -diff, fileINode.getFileReplication());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unexpected exception while updating disk space.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public EnumCounters<StorageType> getStorageTypeDeltas(byte storagePolicyID,
|
public EnumCounters<StorageType> getStorageTypeDeltas(byte storagePolicyID,
|
||||||
long dsDelta, short oldRep, short newRep) {
|
long dsDelta, short oldRep, short newRep) {
|
||||||
EnumCounters<StorageType> typeSpaceDeltas =
|
EnumCounters<StorageType> typeSpaceDeltas =
|
||||||
|
|
|
@ -4219,19 +4219,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
final INodesInPath iip, final Block commitBlock) throws IOException {
|
final INodesInPath iip, final Block commitBlock) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
||||||
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
blockManager.commitOrCompleteLastBlock(fileINode, commitBlock, iip);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adjust disk space consumption if required
|
|
||||||
final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
|
|
||||||
if (diff > 0) {
|
|
||||||
try {
|
|
||||||
dir.updateSpaceConsumed(iip, 0, -diff, fileINode.getFileReplication());
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Unexpected exception while updating disk space.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void finalizeINodeFileUnderConstruction(String src,
|
private void finalizeINodeFileUnderConstruction(String src,
|
||||||
|
@ -4479,7 +4467,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
/**
|
/**
|
||||||
* @param pendingFile open file that needs to be closed
|
* @param pendingFile open file that needs to be closed
|
||||||
* @param storedBlock last block
|
* @param storedBlock last block
|
||||||
* @return Path of the file that was closed.
|
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -7261,6 +7248,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the FSDirectory. */
|
/** @return the FSDirectory. */
|
||||||
|
@Override
|
||||||
public FSDirectory getFSDirectory() {
|
public FSDirectory getFSDirectory() {
|
||||||
return dir;
|
return dir;
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,8 @@ public interface Namesystem extends RwLock, SafeMode {
|
||||||
/** @return the block pool ID */
|
/** @return the block pool ID */
|
||||||
public String getBlockPoolId();
|
public String getBlockPoolId();
|
||||||
|
|
||||||
|
public FSDirectory getFSDirectory();
|
||||||
|
|
||||||
public boolean isInStandbyState();
|
public boolean isInStandbyState();
|
||||||
|
|
||||||
public boolean isGenStampInFuture(Block block);
|
public boolean isGenStampInFuture(Block block);
|
||||||
|
|
|
@ -18,10 +18,16 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -32,58 +38,80 @@ import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.TestFileCreation;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.junit.After;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.helpers.AppenderAttachableImpl;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
public class TestDiskspaceQuotaUpdate {
|
public class TestDiskspaceQuotaUpdate {
|
||||||
private static final int BLOCKSIZE = 1024;
|
private static final int BLOCKSIZE = 1024;
|
||||||
private static final short REPLICATION = 4;
|
private static final short REPLICATION = 4;
|
||||||
static final long seed = 0L;
|
static final long seed = 0L;
|
||||||
private static final Path dir = new Path("/TestQuotaUpdate");
|
private static final Path BASE_DIR = new Path("/TestQuotaUpdate");
|
||||||
|
|
||||||
private Configuration conf;
|
private static Configuration conf;
|
||||||
private MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster;
|
||||||
private FSDirectory fsdir;
|
|
||||||
private DistributedFileSystem dfs;
|
|
||||||
|
|
||||||
@Before
|
@BeforeClass
|
||||||
public void setUp() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
fsdir = cluster.getNamesystem().getFSDirectory();
|
|
||||||
dfs = cluster.getFileSystem();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@AfterClass
|
||||||
public void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Path getParent(String testName) {
|
||||||
|
return new Path(BASE_DIR, testName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private FSDirectory getFSDirectory() {
|
||||||
|
return cluster.getNamesystem().getFSDirectory();
|
||||||
|
}
|
||||||
|
|
||||||
|
private DistributedFileSystem getDFS() throws IOException {
|
||||||
|
return cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if the quota can be correctly updated for create file
|
* Test if the quota can be correctly updated for create file
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testQuotaUpdateWithFileCreate() throws Exception {
|
public void testQuotaUpdateWithFileCreate() throws Exception {
|
||||||
final Path foo = new Path(dir, "foo");
|
final Path foo =
|
||||||
|
new Path(getParent(GenericTestUtils.getMethodName()), "foo");
|
||||||
Path createdFile = new Path(foo, "created_file.data");
|
Path createdFile = new Path(foo, "created_file.data");
|
||||||
dfs.mkdirs(foo);
|
getDFS().mkdirs(foo);
|
||||||
dfs.setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
||||||
long fileLen = BLOCKSIZE * 2 + BLOCKSIZE / 2;
|
long fileLen = BLOCKSIZE * 2 + BLOCKSIZE / 2;
|
||||||
DFSTestUtil.createFile(dfs, createdFile, BLOCKSIZE / 16,
|
DFSTestUtil.createFile(getDFS(), createdFile, BLOCKSIZE / 16,
|
||||||
fileLen, BLOCKSIZE, REPLICATION, seed);
|
fileLen, BLOCKSIZE, REPLICATION, seed);
|
||||||
INode fnode = fsdir.getINode4Write(foo.toString());
|
INode fnode = getFSDirectory().getINode4Write(foo.toString());
|
||||||
assertTrue(fnode.isDirectory());
|
assertTrue(fnode.isDirectory());
|
||||||
assertTrue(fnode.isQuotaSet());
|
assertTrue(fnode.isQuotaSet());
|
||||||
QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
|
QuotaCounts cnt = fnode.asDirectory().getDirectoryWithQuotaFeature()
|
||||||
|
@ -97,18 +125,20 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testUpdateQuotaForAppend() throws Exception {
|
public void testUpdateQuotaForAppend() throws Exception {
|
||||||
final Path foo = new Path(dir ,"foo");
|
final Path foo =
|
||||||
|
new Path(getParent(GenericTestUtils.getMethodName()), "foo");
|
||||||
final Path bar = new Path(foo, "bar");
|
final Path bar = new Path(foo, "bar");
|
||||||
long currentFileLen = BLOCKSIZE;
|
long currentFileLen = BLOCKSIZE;
|
||||||
DFSTestUtil.createFile(dfs, bar, currentFileLen, REPLICATION, seed);
|
DFSTestUtil.createFile(getDFS(), bar, currentFileLen, REPLICATION, seed);
|
||||||
dfs.setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
getDFS().setQuota(foo, Long.MAX_VALUE-1, Long.MAX_VALUE-1);
|
||||||
|
|
||||||
// append half of the block data, the previous file length is at block
|
// append half of the block data, the previous file length is at block
|
||||||
// boundary
|
// boundary
|
||||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE / 2);
|
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE / 2);
|
||||||
currentFileLen += (BLOCKSIZE / 2);
|
currentFileLen += (BLOCKSIZE / 2);
|
||||||
|
|
||||||
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
INodeDirectory fooNode =
|
||||||
|
getFSDirectory().getINode4Write(foo.toString()).asDirectory();
|
||||||
assertTrue(fooNode.isQuotaSet());
|
assertTrue(fooNode.isQuotaSet());
|
||||||
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
||||||
.getSpaceConsumed();
|
.getSpaceConsumed();
|
||||||
|
@ -116,11 +146,11 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
long ds = quota.getStorageSpace();
|
long ds = quota.getStorageSpace();
|
||||||
assertEquals(2, ns); // foo and bar
|
assertEquals(2, ns); // foo and bar
|
||||||
assertEquals(currentFileLen * REPLICATION, ds);
|
assertEquals(currentFileLen * REPLICATION, ds);
|
||||||
ContentSummary c = dfs.getContentSummary(foo);
|
ContentSummary c = getDFS().getContentSummary(foo);
|
||||||
assertEquals(c.getSpaceConsumed(), ds);
|
assertEquals(c.getSpaceConsumed(), ds);
|
||||||
|
|
||||||
// append another block, the previous file length is not at block boundary
|
// append another block, the previous file length is not at block boundary
|
||||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE);
|
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE);
|
||||||
currentFileLen += BLOCKSIZE;
|
currentFileLen += BLOCKSIZE;
|
||||||
|
|
||||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||||
|
@ -128,11 +158,11 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
ds = quota.getStorageSpace();
|
ds = quota.getStorageSpace();
|
||||||
assertEquals(2, ns); // foo and bar
|
assertEquals(2, ns); // foo and bar
|
||||||
assertEquals(currentFileLen * REPLICATION, ds);
|
assertEquals(currentFileLen * REPLICATION, ds);
|
||||||
c = dfs.getContentSummary(foo);
|
c = getDFS().getContentSummary(foo);
|
||||||
assertEquals(c.getSpaceConsumed(), ds);
|
assertEquals(c.getSpaceConsumed(), ds);
|
||||||
|
|
||||||
// append several blocks
|
// append several blocks
|
||||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
||||||
currentFileLen += (BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
currentFileLen += (BLOCKSIZE * 3 + BLOCKSIZE / 8);
|
||||||
|
|
||||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||||
|
@ -140,7 +170,7 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
ds = quota.getStorageSpace();
|
ds = quota.getStorageSpace();
|
||||||
assertEquals(2, ns); // foo and bar
|
assertEquals(2, ns); // foo and bar
|
||||||
assertEquals(currentFileLen * REPLICATION, ds);
|
assertEquals(currentFileLen * REPLICATION, ds);
|
||||||
c = dfs.getContentSummary(foo);
|
c = getDFS().getContentSummary(foo);
|
||||||
assertEquals(c.getSpaceConsumed(), ds);
|
assertEquals(c.getSpaceConsumed(), ds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,16 +180,18 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testUpdateQuotaForFSync() throws Exception {
|
public void testUpdateQuotaForFSync() throws Exception {
|
||||||
final Path foo = new Path("/foo");
|
final Path foo =
|
||||||
|
new Path(getParent(GenericTestUtils.getMethodName()), "foo");
|
||||||
final Path bar = new Path(foo, "bar");
|
final Path bar = new Path(foo, "bar");
|
||||||
DFSTestUtil.createFile(dfs, bar, BLOCKSIZE, REPLICATION, 0L);
|
DFSTestUtil.createFile(getDFS(), bar, BLOCKSIZE, REPLICATION, 0L);
|
||||||
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
getDFS().setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||||
|
|
||||||
FSDataOutputStream out = dfs.append(bar);
|
FSDataOutputStream out = getDFS().append(bar);
|
||||||
out.write(new byte[BLOCKSIZE / 4]);
|
out.write(new byte[BLOCKSIZE / 4]);
|
||||||
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
||||||
|
|
||||||
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
INodeDirectory fooNode =
|
||||||
|
getFSDirectory().getINode4Write(foo.toString()).asDirectory();
|
||||||
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
||||||
.getSpaceConsumed();
|
.getSpaceConsumed();
|
||||||
long ns = quota.getNameSpace();
|
long ns = quota.getNameSpace();
|
||||||
|
@ -170,7 +202,7 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
out.write(new byte[BLOCKSIZE / 4]);
|
out.write(new byte[BLOCKSIZE / 4]);
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
fooNode = getFSDirectory().getINode4Write(foo.toString()).asDirectory();
|
||||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||||
ns = quota.getNameSpace();
|
ns = quota.getNameSpace();
|
||||||
ds = quota.getStorageSpace();
|
ds = quota.getStorageSpace();
|
||||||
|
@ -178,7 +210,7 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds);
|
assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds);
|
||||||
|
|
||||||
// append another block
|
// append another block
|
||||||
DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE);
|
DFSTestUtil.appendFile(getDFS(), bar, BLOCKSIZE);
|
||||||
|
|
||||||
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed();
|
||||||
ns = quota.getNameSpace();
|
ns = quota.getNameSpace();
|
||||||
|
@ -192,28 +224,28 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testAppendOverStorageQuota() throws Exception {
|
public void testAppendOverStorageQuota() throws Exception {
|
||||||
final Path dir = new Path("/TestAppendOverQuota");
|
final Path dir = getParent(GenericTestUtils.getMethodName());
|
||||||
final Path file = new Path(dir, "file");
|
final Path file = new Path(dir, "file");
|
||||||
|
|
||||||
// create partial block file
|
// create partial block file
|
||||||
dfs.mkdirs(dir);
|
getDFS().mkdirs(dir);
|
||||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);
|
||||||
|
|
||||||
// lower quota to cause exception when appending to partial block
|
// lower quota to cause exception when appending to partial block
|
||||||
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
|
getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
final INodeDirectory dirNode =
|
||||||
.asDirectory();
|
getFSDirectory().getINode4Write(dir.toString()).asDirectory();
|
||||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
.getSpaceConsumed().getStorageSpace();
|
.getSpaceConsumed().getStorageSpace();
|
||||||
try {
|
try {
|
||||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE);
|
||||||
Assert.fail("append didn't fail");
|
Assert.fail("append didn't fail");
|
||||||
} catch (DSQuotaExceededException e) {
|
} catch (DSQuotaExceededException e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
// check that the file exists, isn't UC, and has no dangling lease
|
// check that the file exists, isn't UC, and has no dangling lease
|
||||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
|
||||||
Assert.assertNotNull(inode);
|
Assert.assertNotNull(inode);
|
||||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||||
Assert.assertNull("should not have a lease", cluster.getNamesystem().getLeaseManager().getLeaseByPath(file.toString()));
|
Assert.assertNull("should not have a lease", cluster.getNamesystem().getLeaseManager().getLeaseByPath(file.toString()));
|
||||||
|
@ -222,7 +254,7 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
.getSpaceConsumed().getStorageSpace();
|
.getSpaceConsumed().getStorageSpace();
|
||||||
assertEquals(spaceUsed, newSpaceUsed);
|
assertEquals(spaceUsed, newSpaceUsed);
|
||||||
// make sure edits aren't corrupted
|
// make sure edits aren't corrupted
|
||||||
dfs.recoverLease(file);
|
getDFS().recoverLease(file);
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,30 +264,30 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testAppendOverTypeQuota() throws Exception {
|
public void testAppendOverTypeQuota() throws Exception {
|
||||||
final Path dir = new Path("/TestAppendOverTypeQuota");
|
final Path dir = getParent(GenericTestUtils.getMethodName());
|
||||||
final Path file = new Path(dir, "file");
|
final Path file = new Path(dir, "file");
|
||||||
|
|
||||||
// create partial block file
|
// create partial block file
|
||||||
dfs.mkdirs(dir);
|
getDFS().mkdirs(dir);
|
||||||
// set the storage policy on dir
|
// set the storage policy on dir
|
||||||
dfs.setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
getDFS().setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
||||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);
|
||||||
|
|
||||||
// set quota of SSD to 1L
|
// set quota of SSD to 1L
|
||||||
dfs.setQuotaByStorageType(dir, StorageType.SSD, 1L);
|
getDFS().setQuotaByStorageType(dir, StorageType.SSD, 1L);
|
||||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
final INodeDirectory dirNode =
|
||||||
.asDirectory();
|
getFSDirectory().getINode4Write(dir.toString()).asDirectory();
|
||||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
.getSpaceConsumed().getStorageSpace();
|
.getSpaceConsumed().getStorageSpace();
|
||||||
try {
|
try {
|
||||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
DFSTestUtil.appendFile(getDFS(), file, BLOCKSIZE);
|
||||||
Assert.fail("append didn't fail");
|
Assert.fail("append didn't fail");
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
assertTrue(e.getClassName().contains("QuotaByStorageTypeExceededException"));
|
assertTrue(e.getClassName().contains("QuotaByStorageTypeExceededException"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// check that the file exists, isn't UC, and has no dangling lease
|
// check that the file exists, isn't UC, and has no dangling lease
|
||||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
|
||||||
Assert.assertNotNull(inode);
|
Assert.assertNotNull(inode);
|
||||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||||
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
||||||
|
@ -265,7 +297,7 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
.getSpaceConsumed().getStorageSpace();
|
.getSpaceConsumed().getStorageSpace();
|
||||||
assertEquals(spaceUsed, newSpaceUsed);
|
assertEquals(spaceUsed, newSpaceUsed);
|
||||||
// make sure edits aren't corrupted
|
// make sure edits aren't corrupted
|
||||||
dfs.recoverLease(file);
|
getDFS().recoverLease(file);
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,28 +306,28 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
*/
|
*/
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testTruncateOverQuota() throws Exception {
|
public void testTruncateOverQuota() throws Exception {
|
||||||
final Path dir = new Path("/TestTruncateOverquota");
|
final Path dir = getParent(GenericTestUtils.getMethodName());
|
||||||
final Path file = new Path(dir, "file");
|
final Path file = new Path(dir, "file");
|
||||||
|
|
||||||
// create partial block file
|
// create partial block file
|
||||||
dfs.mkdirs(dir);
|
getDFS().mkdirs(dir);
|
||||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, REPLICATION, seed);
|
||||||
|
|
||||||
// lower quota to cause exception when appending to partial block
|
// lower quota to cause exception when appending to partial block
|
||||||
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
|
getDFS().setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
final INodeDirectory dirNode =
|
||||||
.asDirectory();
|
getFSDirectory().getINode4Write(dir.toString()).asDirectory();
|
||||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
.getSpaceConsumed().getStorageSpace();
|
.getSpaceConsumed().getStorageSpace();
|
||||||
try {
|
try {
|
||||||
dfs.truncate(file, BLOCKSIZE / 2 - 1);
|
getDFS().truncate(file, BLOCKSIZE / 2 - 1);
|
||||||
Assert.fail("truncate didn't fail");
|
Assert.fail("truncate didn't fail");
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
assertTrue(e.getClassName().contains("DSQuotaExceededException"));
|
assertTrue(e.getClassName().contains("DSQuotaExceededException"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// check that the file exists, isn't UC, and has no dangling lease
|
// check that the file exists, isn't UC, and has no dangling lease
|
||||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
INodeFile inode = getFSDirectory().getINode(file.toString()).asFile();
|
||||||
Assert.assertNotNull(inode);
|
Assert.assertNotNull(inode);
|
||||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||||
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
||||||
|
@ -305,7 +337,136 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
.getSpaceConsumed().getStorageSpace();
|
.getSpaceConsumed().getStorageSpace();
|
||||||
assertEquals(spaceUsed, newSpaceUsed);
|
assertEquals(spaceUsed, newSpaceUsed);
|
||||||
// make sure edits aren't corrupted
|
// make sure edits aren't corrupted
|
||||||
dfs.recoverLease(file);
|
getDFS().recoverLease(file);
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the cached quota stays correct between the COMMIT
|
||||||
|
* and COMPLETE block steps, even if the replication factor is
|
||||||
|
* changed during this time.
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testQuotaIssuesWhileCommitting() throws Exception {
|
||||||
|
// We want a one-DN cluster so that we can force a lack of
|
||||||
|
// commit by only instrumenting a single DN; we kill the other 3
|
||||||
|
List<MiniDFSCluster.DataNodeProperties> dnprops = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
for (int i = REPLICATION - 1; i > 0; i--) {
|
||||||
|
dnprops.add(cluster.stopDataNode(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||||
|
DataNodeTestUtils.spyOnBposToNN(
|
||||||
|
cluster.getDataNodes().get(0), cluster.getNameNode());
|
||||||
|
|
||||||
|
testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 4);
|
||||||
|
testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 4, (short) 1);
|
||||||
|
|
||||||
|
// Don't actually change replication; just check that the sizes
|
||||||
|
// agree during the commit period
|
||||||
|
testQuotaIssuesWhileCommittingHelper(nnSpy, (short) 1, (short) 1);
|
||||||
|
} finally {
|
||||||
|
for (MiniDFSCluster.DataNodeProperties dnprop : dnprops) {
|
||||||
|
cluster.restartDataNode(dnprop, true);
|
||||||
|
}
|
||||||
|
cluster.waitActive();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testQuotaIssuesWhileCommittingHelper(
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nnSpy,
|
||||||
|
final short initialReplication, final short finalReplication)
|
||||||
|
throws Exception {
|
||||||
|
final String logStmt =
|
||||||
|
"BUG: Inconsistent storagespace for directory";
|
||||||
|
final Path dir = new Path(getParent(GenericTestUtils.getMethodName()),
|
||||||
|
String.format("%d-%d", initialReplication, finalReplication));
|
||||||
|
final Path file = new Path(dir, "testfile");
|
||||||
|
|
||||||
|
LogCapturer logs =
|
||||||
|
LogCapturer.captureLogs(GenericTestUtils.toLog4j(NameNode.LOG));
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
if (finalReplication != initialReplication) {
|
||||||
|
getDFS().setReplication(file, finalReplication);
|
||||||
|
}
|
||||||
|
// Call getContentSummary before the DN can notify the NN
|
||||||
|
// that the block has been received to check for discrepancy
|
||||||
|
getDFS().getContentSummary(dir);
|
||||||
|
invocation.callRealMethod();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(nnSpy).blockReceivedAndDeleted(
|
||||||
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
|
Mockito.anyString(),
|
||||||
|
Mockito.<StorageReceivedDeletedBlocks[]>anyObject()
|
||||||
|
);
|
||||||
|
|
||||||
|
getDFS().mkdirs(dir);
|
||||||
|
getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(getDFS(), file, BLOCKSIZE/2, initialReplication, 1L);
|
||||||
|
|
||||||
|
// Also check for discrepancy after completing the file
|
||||||
|
getDFS().getContentSummary(dir);
|
||||||
|
assertFalse(logs.getOutput().contains(logStmt));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the cached quota remains correct when the block has been
|
||||||
|
* written to but not yet committed, even if the replication factor
|
||||||
|
* is updated during this time.
|
||||||
|
*/
|
||||||
|
private void testQuotaIssuesBeforeCommitting(short initialReplication,
|
||||||
|
short finalReplication) throws Exception {
|
||||||
|
final String logStmt =
|
||||||
|
"BUG: Inconsistent storagespace for directory";
|
||||||
|
final Path dir = new Path(getParent(GenericTestUtils.getMethodName()),
|
||||||
|
String.format("%d-%d", initialReplication, finalReplication));
|
||||||
|
final Path file = new Path(dir, "testfile");
|
||||||
|
|
||||||
|
LogCapturer logs =
|
||||||
|
LogCapturer.captureLogs(GenericTestUtils.toLog4j(NameNode.LOG));
|
||||||
|
|
||||||
|
getDFS().mkdirs(dir);
|
||||||
|
getDFS().setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
|
||||||
|
|
||||||
|
FSDataOutputStream out =
|
||||||
|
TestFileCreation.createFile(getDFS(), file, initialReplication);
|
||||||
|
TestFileCreation.writeFile(out, BLOCKSIZE / 2);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
getDFS().getContentSummary(dir);
|
||||||
|
if (finalReplication != initialReplication) {
|
||||||
|
// While the block is visible to the NN but has not yet been committed,
|
||||||
|
// change the replication
|
||||||
|
getDFS().setReplication(file, finalReplication);
|
||||||
|
}
|
||||||
|
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
getDFS().getContentSummary(dir);
|
||||||
|
assertFalse(logs.getOutput().contains(logStmt));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testCachedComputedSizesAgreeBeforeCommitting() throws Exception {
|
||||||
|
// Don't actually change replication; just check that the sizes
|
||||||
|
// agree before the commit period
|
||||||
|
testQuotaIssuesBeforeCommitting((short)1, (short)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testDecreaseReplicationBeforeCommitting() throws Exception {
|
||||||
|
testQuotaIssuesBeforeCommitting((short)4, (short)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testIncreaseReplicationBeforeCommitting() throws Exception {
|
||||||
|
testQuotaIssuesBeforeCommitting((short)1, (short)4);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue