Merge HDFS-4516. Client crash after block allocation and NN switch before lease recovery for the same file can cause readers to fail forever. Contributed by Vinay.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1543839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
49377f273e
commit
b0bd967090
|
@ -300,6 +300,9 @@ Release 2.2.1 - UNRELEASED
|
||||||
HDFS-5372. In FSNamesystem, hasReadLock() returns false if the current
|
HDFS-5372. In FSNamesystem, hasReadLock() returns false if the current
|
||||||
thread holds the write lock (Vinaykumar B via umamahesh)
|
thread holds the write lock (Vinaykumar B via umamahesh)
|
||||||
|
|
||||||
|
HDFS-4516. Client crash after block allocation and NN switch before lease recovery for
|
||||||
|
the same file can cause readers to fail forever (VinaayKumar B via umamahesh)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -2332,6 +2332,11 @@ public class DFSClient implements java.io.Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ExtendedBlock getPreviousBlock(String file) {
|
||||||
|
return filesBeingWritten.get(file).getBlock();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* enable/disable restore failed storage.
|
* enable/disable restore failed storage.
|
||||||
*
|
*
|
||||||
|
|
|
@ -290,6 +290,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
|
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
|
||||||
if (last != null) {
|
if (last != null) {
|
||||||
if (last.getLocations().length == 0) {
|
if (last.getLocations().length == 0) {
|
||||||
|
if (last.getBlockSize() == 0) {
|
||||||
|
// if the length is zero, then no data has been written to
|
||||||
|
// datanode. So no need to wait for the locations.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
final long len = readBlockLength(last);
|
final long len = readBlockLength(last);
|
||||||
|
|
|
@ -1718,7 +1718,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
waitForAckedSeqno(toWaitFor);
|
waitForAckedSeqno(toWaitFor);
|
||||||
|
|
||||||
if (updateLength) {
|
// update the block length first time irrespective of flag
|
||||||
|
if (updateLength || persistBlocks.get()) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (streamer != null && streamer.block != null) {
|
if (streamer != null && streamer.block != null) {
|
||||||
lastBlockLength = streamer.block.getNumBytes();
|
lastBlockLength = streamer.block.getNumBytes();
|
||||||
|
@ -1984,4 +1985,14 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
public void setDropBehind(Boolean dropBehind) throws IOException {
|
public void setDropBehind(Boolean dropBehind) throws IOException {
|
||||||
this.cachingStrategy.setDropBehind(dropBehind);
|
this.cachingStrategy.setDropBehind(dropBehind);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ExtendedBlock getBlock() {
|
||||||
|
return streamer.getBlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getFileId() {
|
||||||
|
return fileId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3684,6 +3684,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
if (uc.getNumExpectedLocations() == 0) {
|
if (uc.getNumExpectedLocations() == 0) {
|
||||||
uc.setExpectedLocations(blockManager.getNodes(lastBlock));
|
uc.setExpectedLocations(blockManager.getNodes(lastBlock));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) {
|
||||||
|
// There is no datanode reported to this block.
|
||||||
|
// may be client have crashed before writing data to pipeline.
|
||||||
|
// This blocks doesn't need any recovery.
|
||||||
|
// We can remove this block and close the file.
|
||||||
|
pendingFile.removeLastBlock(lastBlock);
|
||||||
|
finalizeINodeFileUnderConstruction(src, pendingFile,
|
||||||
|
iip.getLatestSnapshot());
|
||||||
|
NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
|
||||||
|
+ "Removed empty last block and closed file.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
// start recovery of the last block for this file
|
// start recovery of the last block for this file
|
||||||
long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc));
|
long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc));
|
||||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
|
||||||
public class DFSClientAdapter {
|
public class DFSClientAdapter {
|
||||||
|
@ -43,4 +44,21 @@ public class DFSClientAdapter {
|
||||||
String src, long start, long length) throws IOException {
|
String src, long start, long length) throws IOException {
|
||||||
return DFSClient.callGetBlockLocations(namenode, src, start, length);
|
return DFSClient.callGetBlockLocations(namenode, src, start, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ClientProtocol getNamenode(DFSClient client) throws IOException {
|
||||||
|
return client.namenode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DFSClient getClient(DistributedFileSystem dfs)
|
||||||
|
throws IOException {
|
||||||
|
return dfs.dfs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ExtendedBlock getPreviousBlock(DFSClient client, String file) {
|
||||||
|
return client.getPreviousBlock(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getFileId(DFSOutputStream out) {
|
||||||
|
return out.getFileId();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,7 +175,7 @@ public class TestPersistBlocks {
|
||||||
// This would mean that blocks were successfully persisted to the log
|
// This would mean that blocks were successfully persisted to the log
|
||||||
FileStatus status = fs.getFileStatus(FILE_PATH);
|
FileStatus status = fs.getFileStatus(FILE_PATH);
|
||||||
assertTrue("Length incorrect: " + status.getLen(),
|
assertTrue("Length incorrect: " + status.getLen(),
|
||||||
status.getLen() != len - BLOCK_SIZE);
|
status.getLen() == len - BLOCK_SIZE);
|
||||||
|
|
||||||
// Verify the data showed up from before restart, sans abandoned block.
|
// Verify the data showed up from before restart, sans abandoned block.
|
||||||
FSDataInputStream readStream = fs.open(FILE_PATH);
|
FSDataInputStream readStream = fs.open(FILE_PATH);
|
||||||
|
|
|
@ -34,16 +34,23 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClientAdapter;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
|
@ -766,4 +773,50 @@ public class TestHASafeMode {
|
||||||
assertFalse("ANN should be out of SafeMode", dfsWithFailOver.isInSafeMode());
|
assertFalse("ANN should be out of SafeMode", dfsWithFailOver.isInSafeMode());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test NN crash and client crash/stuck immediately after block allocation */
|
||||||
|
@Test(timeout = 100000)
|
||||||
|
public void testOpenFileWhenNNAndClientCrashAfterAddBlock() throws Exception {
|
||||||
|
cluster.getConfiguration(0).set(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "1.0f");
|
||||||
|
String testData = "testData";
|
||||||
|
// to make sure we write the full block before creating dummy block at NN.
|
||||||
|
cluster.getConfiguration(0).setInt("io.bytes.per.checksum",
|
||||||
|
testData.length());
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
cluster.transitionToStandby(1);
|
||||||
|
DistributedFileSystem dfs = cluster.getFileSystem(0);
|
||||||
|
String pathString = "/tmp1.txt";
|
||||||
|
Path filePath = new Path(pathString);
|
||||||
|
FSDataOutputStream create = dfs.create(filePath,
|
||||||
|
FsPermission.getDefault(), true, 1024, (short) 3, testData.length(),
|
||||||
|
null);
|
||||||
|
create.write(testData.getBytes());
|
||||||
|
create.hflush();
|
||||||
|
DFSClient client = DFSClientAdapter.getClient(dfs);
|
||||||
|
// add one dummy block at NN, but not write to DataNode
|
||||||
|
ExtendedBlock previousBlock = DFSClientAdapter.getPreviousBlock(client,
|
||||||
|
pathString);
|
||||||
|
DFSClientAdapter.getNamenode(client).addBlock(
|
||||||
|
pathString,
|
||||||
|
client.getClientName(),
|
||||||
|
new ExtendedBlock(previousBlock),
|
||||||
|
new DatanodeInfo[0],
|
||||||
|
DFSClientAdapter.getFileId((DFSOutputStream) create
|
||||||
|
.getWrappedStream()), null);
|
||||||
|
cluster.restartNameNode(0, true);
|
||||||
|
cluster.restartDataNode(0);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
// let the block reports be processed.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
FSDataInputStream is = dfs.open(filePath);
|
||||||
|
is.close();
|
||||||
|
dfs.recoverLease(filePath);// initiate recovery
|
||||||
|
assertTrue("Recovery also should be success", dfs.recoverLease(filePath));
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue