HDFS-4883. complete() should verify fileId. Contributed by Tao Luo.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1495302 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ecf78a99b
commit
92cbba386f
|
@ -124,6 +124,8 @@ Trunk (Unreleased)
|
||||||
|
|
||||||
HDFS-4772. Add number of children in HdfsFileStatus. (brandonli)
|
HDFS-4772. Add number of children in HdfsFileStatus. (brandonli)
|
||||||
|
|
||||||
|
HDFS-4883. complete() should verify fileId. (Tao Luo via shv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -1875,7 +1875,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
long localstart = Time.now();
|
long localstart = Time.now();
|
||||||
boolean fileComplete = false;
|
boolean fileComplete = false;
|
||||||
while (!fileComplete) {
|
while (!fileComplete) {
|
||||||
fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
|
fileComplete =
|
||||||
|
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
|
||||||
if (!fileComplete) {
|
if (!fileComplete) {
|
||||||
final int hdfsTimeout = dfsClient.getHdfsTimeout();
|
final int hdfsTimeout = dfsClient.getHdfsTimeout();
|
||||||
if (!dfsClient.clientRunning ||
|
if (!dfsClient.clientRunning ||
|
||||||
|
|
|
@ -369,6 +369,13 @@ public interface ClientProtocol {
|
||||||
* DataNode failures may cause a client to call complete() several
|
* DataNode failures may cause a client to call complete() several
|
||||||
* times before succeeding.
|
* times before succeeding.
|
||||||
*
|
*
|
||||||
|
* @param src the file being created
|
||||||
|
* @param clientName the name of the client that adds the block
|
||||||
|
* @param last the last block info
|
||||||
|
* @param fileId the id uniquely identifying a file
|
||||||
|
*
|
||||||
|
* @return true if all file blocks are minimally replicated or false otherwise
|
||||||
|
*
|
||||||
* @throws AccessControlException If access is denied
|
* @throws AccessControlException If access is denied
|
||||||
* @throws FileNotFoundException If file <code>src</code> is not found
|
* @throws FileNotFoundException If file <code>src</code> is not found
|
||||||
* @throws SafeModeException create not allowed in safemode
|
* @throws SafeModeException create not allowed in safemode
|
||||||
|
@ -376,7 +383,8 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
public boolean complete(String src, String clientName, ExtendedBlock last)
|
public boolean complete(String src, String clientName,
|
||||||
|
ExtendedBlock last, long fileId)
|
||||||
throws AccessControlException, FileNotFoundException, SafeModeException,
|
throws AccessControlException, FileNotFoundException, SafeModeException,
|
||||||
UnresolvedLinkException, IOException;
|
UnresolvedLinkException, IOException;
|
||||||
|
|
||||||
|
|
|
@ -141,6 +141,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
||||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
|
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
|
||||||
|
@ -426,7 +427,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
||||||
try {
|
try {
|
||||||
boolean result =
|
boolean result =
|
||||||
server.complete(req.getSrc(), req.getClientName(),
|
server.complete(req.getSrc(), req.getClientName(),
|
||||||
req.hasLast() ? PBHelper.convert(req.getLast()) : null);
|
req.hasLast() ? PBHelper.convert(req.getLast()) : null,
|
||||||
|
req.hasFileId() ? req.getFileId() : INodeId.GRANDFATHER_INODE_ID);
|
||||||
return CompleteResponseProto.newBuilder().setResult(result).build();
|
return CompleteResponseProto.newBuilder().setResult(result).build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
|
|
@ -357,12 +357,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean complete(String src, String clientName, ExtendedBlock last)
|
public boolean complete(String src, String clientName,
|
||||||
|
ExtendedBlock last, long fileId)
|
||||||
throws AccessControlException, FileNotFoundException, SafeModeException,
|
throws AccessControlException, FileNotFoundException, SafeModeException,
|
||||||
UnresolvedLinkException, IOException {
|
UnresolvedLinkException, IOException {
|
||||||
CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder()
|
CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder()
|
||||||
.setSrc(src)
|
.setSrc(src)
|
||||||
.setClientName(clientName);
|
.setClientName(clientName)
|
||||||
|
.setFileId(fileId);
|
||||||
if (last != null)
|
if (last != null)
|
||||||
req.setLast(PBHelper.convert(last));
|
req.setLast(PBHelper.convert(last));
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -2577,7 +2577,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* (e.g if not all blocks have reached minimum replication yet)
|
* (e.g if not all blocks have reached minimum replication yet)
|
||||||
* @throws IOException on error (eg lease mismatch, file not open, file deleted)
|
* @throws IOException on error (eg lease mismatch, file not open, file deleted)
|
||||||
*/
|
*/
|
||||||
boolean completeFile(String src, String holder, ExtendedBlock last)
|
boolean completeFile(String src, String holder,
|
||||||
|
ExtendedBlock last, long fileId)
|
||||||
throws SafeModeException, UnresolvedLinkException, IOException {
|
throws SafeModeException, UnresolvedLinkException, IOException {
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
|
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
|
||||||
|
@ -2594,8 +2595,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
throw new SafeModeException("Cannot complete file " + src, safeMode);
|
throw new SafeModeException("Cannot complete file " + src, safeMode);
|
||||||
}
|
}
|
||||||
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
||||||
success = completeFileInternal(src, holder,
|
success = completeFileInternal(src, holder,
|
||||||
ExtendedBlock.getLocalBlock(last));
|
ExtendedBlock.getLocalBlock(last), fileId);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
@ -2606,14 +2607,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean completeFileInternal(String src,
|
private boolean completeFileInternal(String src,
|
||||||
String holder, Block last) throws SafeModeException,
|
String holder, Block last, long fileId) throws SafeModeException,
|
||||||
UnresolvedLinkException, IOException {
|
UnresolvedLinkException, IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
final INodesInPath iip = dir.getLastINodeInPath(src);
|
final INodesInPath iip = dir.getLastINodeInPath(src);
|
||||||
final INodeFileUnderConstruction pendingFile;
|
final INodeFileUnderConstruction pendingFile;
|
||||||
try {
|
try {
|
||||||
pendingFile = checkLease(src, INodeId.GRANDFATHER_INODE_ID,
|
pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
|
||||||
holder, iip.getINode(0));
|
|
||||||
} catch (LeaseExpiredException lee) {
|
} catch (LeaseExpiredException lee) {
|
||||||
final INode inode = dir.getINode(src);
|
final INode inode = dir.getINode(src);
|
||||||
if (inode != null
|
if (inode != null
|
||||||
|
|
|
@ -569,13 +569,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public boolean complete(String src, String clientName, ExtendedBlock last)
|
public boolean complete(String src, String clientName,
|
||||||
|
ExtendedBlock last, long fileId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
if(stateChangeLog.isDebugEnabled()) {
|
||||||
stateChangeLog.debug("*DIR* NameNode.complete: "
|
stateChangeLog.debug("*DIR* NameNode.complete: "
|
||||||
+ src + " for " + clientName);
|
+ src + " fileId=" + fileId +" for " + clientName);
|
||||||
}
|
}
|
||||||
return namesystem.completeFile(src, clientName, last);
|
return namesystem.completeFile(src, clientName, last, fileId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -145,6 +145,7 @@ message CompleteRequestProto {
|
||||||
required string src = 1;
|
required string src = 1;
|
||||||
required string clientName = 2;
|
required string clientName = 2;
|
||||||
optional ExtendedBlockProto last = 3;
|
optional ExtendedBlockProto last = 3;
|
||||||
|
optional uint64 fileId = 4 [default = 0]; // default to GRANDFATHER_INODE_ID
|
||||||
}
|
}
|
||||||
|
|
||||||
message CompleteResponseProto {
|
message CompleteResponseProto {
|
||||||
|
|
|
@ -423,7 +423,7 @@ public class TestDFSClientRetries {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}).when(spyNN).complete(Mockito.anyString(), Mockito.anyString(),
|
}).when(spyNN).complete(Mockito.anyString(), Mockito.anyString(),
|
||||||
Mockito.<ExtendedBlock>any());
|
Mockito.<ExtendedBlock>any(), anyLong());
|
||||||
|
|
||||||
OutputStream stm = client.create(file.toString(), true);
|
OutputStream stm = client.create(file.toString(), true);
|
||||||
try {
|
try {
|
||||||
|
@ -441,7 +441,7 @@ public class TestDFSClientRetries {
|
||||||
Mockito.anyLong(), Mockito.<String[]> any());
|
Mockito.anyLong(), Mockito.<String[]> any());
|
||||||
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
|
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
|
||||||
Mockito.anyString(), Mockito.anyString(),
|
Mockito.anyString(), Mockito.anyString(),
|
||||||
Mockito.<ExtendedBlock>any());
|
Mockito.<ExtendedBlock>any(), anyLong());
|
||||||
|
|
||||||
AppendTestUtil.check(fs, file, 10000);
|
AppendTestUtil.check(fs, file, 10000);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.anyObject;
|
import static org.mockito.Matchers.anyObject;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
@ -156,7 +157,7 @@ public class TestFileAppend4 {
|
||||||
// Delay completeFile
|
// Delay completeFile
|
||||||
GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
|
GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
|
||||||
doAnswer(delayer).when(spyNN).complete(
|
doAnswer(delayer).when(spyNN).complete(
|
||||||
anyString(), anyString(), (ExtendedBlock)anyObject());
|
anyString(), anyString(), (ExtendedBlock)anyObject(), anyLong());
|
||||||
|
|
||||||
DFSClient client = new DFSClient(null, spyNN, conf, null);
|
DFSClient client = new DFSClient(null, spyNN, conf, null);
|
||||||
file1 = new Path("/testRecoverFinalized");
|
file1 = new Path("/testRecoverFinalized");
|
||||||
|
@ -229,7 +230,7 @@ public class TestFileAppend4 {
|
||||||
GenericTestUtils.DelayAnswer delayer =
|
GenericTestUtils.DelayAnswer delayer =
|
||||||
new GenericTestUtils.DelayAnswer(LOG);
|
new GenericTestUtils.DelayAnswer(LOG);
|
||||||
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
|
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
|
||||||
(ExtendedBlock) anyObject());
|
(ExtendedBlock) anyObject(), anyLong());
|
||||||
|
|
||||||
DFSClient client = new DFSClient(null, spyNN, conf, null);
|
DFSClient client = new DFSClient(null, spyNN, conf, null);
|
||||||
file1 = new Path("/testCompleteOtherLease");
|
file1 = new Path("/testCompleteOtherLease");
|
||||||
|
|
|
@ -1163,4 +1163,37 @@ public class TestFileCreation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test complete(..) - verifies that the fileId in the request
|
||||||
|
* matches that of the Inode.
|
||||||
|
* This test checks that FileNotFoundException exception is thrown in case
|
||||||
|
* the fileId does not match.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFileIdMismatch() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
DistributedFileSystem dfs = null;
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
dfs = (DistributedFileSystem)cluster.getFileSystem();
|
||||||
|
DFSClient client = dfs.dfs;
|
||||||
|
|
||||||
|
final Path f = new Path("/testFileIdMismatch.txt");
|
||||||
|
createFile(dfs, f, 3);
|
||||||
|
long someOtherFileId = -1;
|
||||||
|
try {
|
||||||
|
cluster.getNameNodeRpc()
|
||||||
|
.complete(f.toString(), client.clientName, null, someOtherFileId);
|
||||||
|
fail();
|
||||||
|
} catch(FileNotFoundException fnf) {
|
||||||
|
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", fnf);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(dfs);
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -590,7 +590,7 @@ public class NNThroughputBenchmark {
|
||||||
long end = Time.now();
|
long end = Time.now();
|
||||||
for(boolean written = !closeUponCreate; !written;
|
for(boolean written = !closeUponCreate; !written;
|
||||||
written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
|
written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
|
||||||
clientName, null));
|
clientName, null, INodeId.GRANDFATHER_INODE_ID));
|
||||||
return end-start;
|
return end-start;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1046,7 +1046,7 @@ public class NNThroughputBenchmark {
|
||||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
|
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
|
||||||
BLOCK_SIZE);
|
BLOCK_SIZE);
|
||||||
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
|
ExtendedBlock lastBlock = addBlocks(fileName, clientName);
|
||||||
nameNodeProto.complete(fileName, clientName, lastBlock);
|
nameNodeProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
|
||||||
}
|
}
|
||||||
// prepare block reports
|
// prepare block reports
|
||||||
for(int idx=0; idx < nrDatanodes; idx++) {
|
for(int idx=0; idx < nrDatanodes; idx++) {
|
||||||
|
|
Loading…
Reference in New Issue