HDFS-7587. Edit log corruption can happen if append fails with a quota violation. Contributed by Jing Zhao.
(cherry picked from commit c7c71cdba5
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
parent
6dcc79507d
commit
5a5b244648
|
@ -899,6 +899,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7943. Append cannot handle the last block with length greater than
|
||||
the preferred block size. (jing9)
|
||||
|
||||
HDFS-7587. Edit log corruption can happen if append fails with a quota
|
||||
violation. (jing9)
|
||||
|
||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||
|
|
|
@ -2672,6 +2672,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
String leaseHolder, String clientMachine, boolean newBlock,
|
||||
boolean writeToEditLog, boolean logRetryCache) throws IOException {
|
||||
final INodeFile file = iip.getLastINode().asFile();
|
||||
final QuotaCounts delta = verifyQuotaForUCBlock(file, iip);
|
||||
|
||||
file.recordModification(iip.getLatestSnapshotId());
|
||||
file.toUnderConstruction(leaseHolder, clientMachine);
|
||||
|
||||
|
@ -2681,10 +2683,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
LocatedBlock ret = null;
|
||||
if (!newBlock) {
|
||||
ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
|
||||
if (ret != null) {
|
||||
// update the quota: use the preferred block size for UC block
|
||||
final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
|
||||
dir.updateSpaceConsumed(iip, 0, diff, file.getBlockReplication());
|
||||
if (ret != null && delta != null) {
|
||||
Preconditions.checkState(delta.getStorageSpace() >= 0,
|
||||
"appending to a block with size larger than the preferred block size");
|
||||
dir.writeLock();
|
||||
try {
|
||||
dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
|
||||
} finally {
|
||||
dir.writeUnlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
BlockInfoContiguous lastBlock = file.getLastBlock();
|
||||
|
@ -2700,6 +2707,52 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify quota when using the preferred block size for UC block. This is
|
||||
* usually used by append and truncate
|
||||
* @throws QuotaExceededException when violating the storage quota
|
||||
* @return expected quota usage update. null means no change or no need to
|
||||
* update quota usage later
|
||||
*/
|
||||
private QuotaCounts verifyQuotaForUCBlock(INodeFile file, INodesInPath iip)
|
||||
throws QuotaExceededException {
|
||||
if (!isImageLoaded() || dir.shouldSkipQuotaChecks()) {
|
||||
// Do not check quota if editlog is still being processed
|
||||
return null;
|
||||
}
|
||||
if (file.getLastBlock() != null) {
|
||||
final QuotaCounts delta = computeQuotaDeltaForUCBlock(file);
|
||||
dir.readLock();
|
||||
try {
|
||||
FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
|
||||
return delta;
|
||||
} finally {
|
||||
dir.readUnlock();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Compute quota change for converting a complete block to a UC block */
|
||||
private QuotaCounts computeQuotaDeltaForUCBlock(INodeFile file) {
|
||||
final QuotaCounts delta = new QuotaCounts.Builder().build();
|
||||
final BlockInfoContiguous lastBlock = file.getLastBlock();
|
||||
if (lastBlock != null) {
|
||||
final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
|
||||
final short repl = file.getBlockReplication();
|
||||
delta.addStorageSpace(diff * repl);
|
||||
final BlockStoragePolicy policy = dir.getBlockStoragePolicySuite()
|
||||
.getPolicy(file.getStoragePolicyID());
|
||||
List<StorageType> types = policy.chooseStorageTypes(repl);
|
||||
for (StorageType t : types) {
|
||||
if (t.supportTypeQuota()) {
|
||||
delta.addTypeSpace(t, diff);
|
||||
}
|
||||
}
|
||||
}
|
||||
return delta;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover lease;
|
||||
* Immediately revoke the lease of the current lease holder and start lease
|
||||
|
@ -3106,7 +3159,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
// doesn't match up with what we think is the last block. There are
|
||||
// four possibilities:
|
||||
// 1) This is the first block allocation of an append() pipeline
|
||||
// which started appending exactly at a block boundary.
|
||||
// which started appending exactly at or exceeding the block boundary.
|
||||
// In this case, the client isn't passed the previous block,
|
||||
// so it makes the allocateBlock() call with previous=null.
|
||||
// We can distinguish this since the last block of the file
|
||||
|
@ -3131,7 +3184,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
|
||||
if (previous == null &&
|
||||
lastBlockInFile != null &&
|
||||
lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
||||
lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
|
||||
lastBlockInFile.isComplete()) {
|
||||
// Case 1
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
|
|
|
@ -26,13 +26,18 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -152,8 +157,7 @@ public class TestDiskspaceQuotaUpdate {
|
|||
|
||||
FSDataOutputStream out = dfs.append(bar);
|
||||
out.write(new byte[BLOCKSIZE / 4]);
|
||||
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
|
||||
.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
||||
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
||||
|
||||
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
||||
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
||||
|
@ -182,4 +186,87 @@ public class TestDiskspaceQuotaUpdate {
|
|||
assertEquals(2, ns); // foo and bar
|
||||
assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test append over storage quota does not mark file as UC or create lease
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testAppendOverStorageQuota() throws Exception {
|
||||
final Path dir = new Path("/TestAppendOverQuota");
|
||||
final Path file = new Path(dir, "file");
|
||||
|
||||
// create partial block file
|
||||
dfs.mkdirs(dir);
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
|
||||
// lower quota to cause exception when appending to partial block
|
||||
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||
.asDirectory();
|
||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
try {
|
||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||
Assert.fail("append didn't fail");
|
||||
} catch (DSQuotaExceededException e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
// check that the file exists, isn't UC, and has no dangling lease
|
||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||
Assert.assertNotNull(inode);
|
||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
||||
.getLeaseManager().getLeaseByPath(file.toString()));
|
||||
// make sure the quota usage is unchanged
|
||||
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
assertEquals(spaceUsed, newSpaceUsed);
|
||||
// make sure edits aren't corrupted
|
||||
dfs.recoverLease(file);
|
||||
cluster.restartNameNodes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test append over a specific type of storage quota does not mark file as
|
||||
* UC or create a lease
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testAppendOverTypeQuota() throws Exception {
|
||||
final Path dir = new Path("/TestAppendOverTypeQuota");
|
||||
final Path file = new Path(dir, "file");
|
||||
|
||||
// create partial block file
|
||||
dfs.mkdirs(dir);
|
||||
// set the storage policy on dir
|
||||
dfs.setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
||||
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||
|
||||
// set quota of SSD to 1L
|
||||
dfs.setQuotaByStorageType(dir, StorageType.SSD, 1L);
|
||||
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||
.asDirectory();
|
||||
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
try {
|
||||
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||
Assert.fail("append didn't fail");
|
||||
} catch (RemoteException e) {
|
||||
assertTrue(e.getClassName().contains("QuotaByStorageTypeExceededException"));
|
||||
}
|
||||
|
||||
// check that the file exists, isn't UC, and has no dangling lease
|
||||
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||
Assert.assertNotNull(inode);
|
||||
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
||||
.getLeaseManager().getLeaseByPath(file.toString()));
|
||||
// make sure the quota usage is unchanged
|
||||
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||
.getSpaceConsumed().getStorageSpace();
|
||||
assertEquals(spaceUsed, newSpaceUsed);
|
||||
// make sure edits aren't corrupted
|
||||
dfs.recoverLease(file);
|
||||
cluster.restartNameNodes();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue