HDFS-7587. Edit log corruption can happen if append fails with a quota violation. Contributed by Jing Zhao.
This commit is contained in:
parent
43b41f2241
commit
c7c71cdba5
|
@ -1205,6 +1205,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade
|
HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade
|
||||||
starts (Zhe Zhang via Colin P. McCabe)
|
starts (Zhe Zhang via Colin P. McCabe)
|
||||||
|
|
||||||
|
HDFS-7587. Edit log corruption can happen if append fails with a quota
|
||||||
|
violation. (jing9)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||||
|
|
|
@ -2678,6 +2678,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
String leaseHolder, String clientMachine, boolean newBlock,
|
String leaseHolder, String clientMachine, boolean newBlock,
|
||||||
boolean writeToEditLog, boolean logRetryCache) throws IOException {
|
boolean writeToEditLog, boolean logRetryCache) throws IOException {
|
||||||
final INodeFile file = iip.getLastINode().asFile();
|
final INodeFile file = iip.getLastINode().asFile();
|
||||||
|
final QuotaCounts delta = verifyQuotaForUCBlock(file, iip);
|
||||||
|
|
||||||
file.recordModification(iip.getLatestSnapshotId());
|
file.recordModification(iip.getLatestSnapshotId());
|
||||||
file.toUnderConstruction(leaseHolder, clientMachine);
|
file.toUnderConstruction(leaseHolder, clientMachine);
|
||||||
|
|
||||||
|
@ -2687,10 +2689,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
LocatedBlock ret = null;
|
LocatedBlock ret = null;
|
||||||
if (!newBlock) {
|
if (!newBlock) {
|
||||||
ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
|
ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
|
||||||
if (ret != null) {
|
if (ret != null && delta != null) {
|
||||||
// update the quota: use the preferred block size for UC block
|
Preconditions.checkState(delta.getStorageSpace() >= 0,
|
||||||
final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
|
"appending to a block with size larger than the preferred block size");
|
||||||
dir.updateSpaceConsumed(iip, 0, diff, file.getBlockReplication());
|
dir.writeLock();
|
||||||
|
try {
|
||||||
|
dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
|
||||||
|
} finally {
|
||||||
|
dir.writeUnlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
BlockInfoContiguous lastBlock = file.getLastBlock();
|
BlockInfoContiguous lastBlock = file.getLastBlock();
|
||||||
|
@ -2706,6 +2713,52 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify quota when using the preferred block size for UC block. This is
|
||||||
|
* usually used by append and truncate
|
||||||
|
* @throws QuotaExceededException when violating the storage quota
|
||||||
|
* @return expected quota usage update. null means no change or no need to
|
||||||
|
* update quota usage later
|
||||||
|
*/
|
||||||
|
private QuotaCounts verifyQuotaForUCBlock(INodeFile file, INodesInPath iip)
|
||||||
|
throws QuotaExceededException {
|
||||||
|
if (!isImageLoaded() || dir.shouldSkipQuotaChecks()) {
|
||||||
|
// Do not check quota if editlog is still being processed
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (file.getLastBlock() != null) {
|
||||||
|
final QuotaCounts delta = computeQuotaDeltaForUCBlock(file);
|
||||||
|
dir.readLock();
|
||||||
|
try {
|
||||||
|
FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
|
||||||
|
return delta;
|
||||||
|
} finally {
|
||||||
|
dir.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Compute quota change for converting a complete block to a UC block */
|
||||||
|
private QuotaCounts computeQuotaDeltaForUCBlock(INodeFile file) {
|
||||||
|
final QuotaCounts delta = new QuotaCounts.Builder().build();
|
||||||
|
final BlockInfoContiguous lastBlock = file.getLastBlock();
|
||||||
|
if (lastBlock != null) {
|
||||||
|
final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
|
||||||
|
final short repl = file.getBlockReplication();
|
||||||
|
delta.addStorageSpace(diff * repl);
|
||||||
|
final BlockStoragePolicy policy = dir.getBlockStoragePolicySuite()
|
||||||
|
.getPolicy(file.getStoragePolicyID());
|
||||||
|
List<StorageType> types = policy.chooseStorageTypes(repl);
|
||||||
|
for (StorageType t : types) {
|
||||||
|
if (t.supportTypeQuota()) {
|
||||||
|
delta.addTypeSpace(t, diff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return delta;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recover lease;
|
* Recover lease;
|
||||||
* Immediately revoke the lease of the current lease holder and start lease
|
* Immediately revoke the lease of the current lease holder and start lease
|
||||||
|
@ -3106,7 +3159,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// doesn't match up with what we think is the last block. There are
|
// doesn't match up with what we think is the last block. There are
|
||||||
// four possibilities:
|
// four possibilities:
|
||||||
// 1) This is the first block allocation of an append() pipeline
|
// 1) This is the first block allocation of an append() pipeline
|
||||||
// which started appending exactly at a block boundary.
|
// which started appending exactly at or exceeding the block boundary.
|
||||||
// In this case, the client isn't passed the previous block,
|
// In this case, the client isn't passed the previous block,
|
||||||
// so it makes the allocateBlock() call with previous=null.
|
// so it makes the allocateBlock() call with previous=null.
|
||||||
// We can distinguish this since the last block of the file
|
// We can distinguish this since the last block of the file
|
||||||
|
@ -3131,7 +3184,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
|
BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
|
||||||
if (previous == null &&
|
if (previous == null &&
|
||||||
lastBlockInFile != null &&
|
lastBlockInFile != null &&
|
||||||
lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
|
||||||
lastBlockInFile.isComplete()) {
|
lastBlockInFile.isComplete()) {
|
||||||
// Case 1
|
// Case 1
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
|
|
@ -26,13 +26,18 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -152,8 +157,7 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
|
|
||||||
FSDataOutputStream out = dfs.append(bar);
|
FSDataOutputStream out = dfs.append(bar);
|
||||||
out.write(new byte[BLOCKSIZE / 4]);
|
out.write(new byte[BLOCKSIZE / 4]);
|
||||||
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
|
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
||||||
.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
|
|
||||||
|
|
||||||
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory();
|
||||||
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature()
|
||||||
|
@ -182,4 +186,87 @@ public class TestDiskspaceQuotaUpdate {
|
||||||
assertEquals(2, ns); // foo and bar
|
assertEquals(2, ns); // foo and bar
|
||||||
assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds);
|
assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test append over storage quota does not mark file as UC or create lease
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAppendOverStorageQuota() throws Exception {
|
||||||
|
final Path dir = new Path("/TestAppendOverQuota");
|
||||||
|
final Path file = new Path(dir, "file");
|
||||||
|
|
||||||
|
// create partial block file
|
||||||
|
dfs.mkdirs(dir);
|
||||||
|
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||||
|
|
||||||
|
// lower quota to cause exception when appending to partial block
|
||||||
|
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
|
||||||
|
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||||
|
.asDirectory();
|
||||||
|
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
|
.getSpaceConsumed().getStorageSpace();
|
||||||
|
try {
|
||||||
|
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||||
|
Assert.fail("append didn't fail");
|
||||||
|
} catch (DSQuotaExceededException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that the file exists, isn't UC, and has no dangling lease
|
||||||
|
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||||
|
Assert.assertNotNull(inode);
|
||||||
|
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||||
|
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
||||||
|
.getLeaseManager().getLeaseByPath(file.toString()));
|
||||||
|
// make sure the quota usage is unchanged
|
||||||
|
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
|
.getSpaceConsumed().getStorageSpace();
|
||||||
|
assertEquals(spaceUsed, newSpaceUsed);
|
||||||
|
// make sure edits aren't corrupted
|
||||||
|
dfs.recoverLease(file);
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test append over a specific type of storage quota does not mark file as
|
||||||
|
* UC or create a lease
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAppendOverTypeQuota() throws Exception {
|
||||||
|
final Path dir = new Path("/TestAppendOverTypeQuota");
|
||||||
|
final Path file = new Path(dir, "file");
|
||||||
|
|
||||||
|
// create partial block file
|
||||||
|
dfs.mkdirs(dir);
|
||||||
|
// set the storage policy on dir
|
||||||
|
dfs.setStoragePolicy(dir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
||||||
|
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
|
||||||
|
|
||||||
|
// set quota of SSD to 1L
|
||||||
|
dfs.setQuotaByStorageType(dir, StorageType.SSD, 1L);
|
||||||
|
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
|
||||||
|
.asDirectory();
|
||||||
|
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
|
.getSpaceConsumed().getStorageSpace();
|
||||||
|
try {
|
||||||
|
DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
|
||||||
|
Assert.fail("append didn't fail");
|
||||||
|
} catch (RemoteException e) {
|
||||||
|
assertTrue(e.getClassName().contains("QuotaByStorageTypeExceededException"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that the file exists, isn't UC, and has no dangling lease
|
||||||
|
INodeFile inode = fsdir.getINode(file.toString()).asFile();
|
||||||
|
Assert.assertNotNull(inode);
|
||||||
|
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
|
||||||
|
Assert.assertNull("should not have a lease", cluster.getNamesystem()
|
||||||
|
.getLeaseManager().getLeaseByPath(file.toString()));
|
||||||
|
// make sure the quota usage is unchanged
|
||||||
|
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
|
||||||
|
.getSpaceConsumed().getStorageSpace();
|
||||||
|
assertEquals(spaceUsed, newSpaceUsed);
|
||||||
|
// make sure edits aren't corrupted
|
||||||
|
dfs.recoverLease(file);
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue