HDFS-7996. After swapping a volume, BlockReceiver reports ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)

(cherry picked from commit 023133cef9)
This commit is contained in:
Colin Patrick Mccabe 2015-04-03 14:19:46 -07:00
parent 386b90a700
commit 0c5069c432
3 changed files with 63 additions and 19 deletions

View File

@ -1072,6 +1072,9 @@ Release 2.7.0 - UNRELEASED
HDFS-8039. Fix TestDebugAdmin#testRecoverLease and HDFS-8039. Fix TestDebugAdmin#testRecoverLease and
testVerifyBlockChecksumCommand on Windows. (Xiaoyu Yao via cnauroth) testVerifyBlockChecksumCommand on Windows. (Xiaoyu Yao via cnauroth)
HDFS-7996. After swapping a volume, BlockReceiver reports
ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -281,7 +281,7 @@ class BlockReceiver implements Closeable {
} }
/** /**
* close files. * close files and release volume reference.
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
@ -798,17 +798,20 @@ class BlockReceiver implements Closeable {
// then finalize block or convert temporary to RBW. // then finalize block or convert temporary to RBW.
// For client-writes, the block is finalized in the PacketResponder. // For client-writes, the block is finalized in the PacketResponder.
if (isDatanode || isTransfer) { if (isDatanode || isTransfer) {
// close the block/crc files // Hold a volume reference to finalize block.
close(); try (ReplicaHandler handler = claimReplicaHandler()) {
block.setNumBytes(replicaInfo.getNumBytes()); // close the block/crc files
close();
block.setNumBytes(replicaInfo.getNumBytes());
if (stage == BlockConstructionStage.TRANSFER_RBW) { if (stage == BlockConstructionStage.TRANSFER_RBW) {
// for TRANSFER_RBW, convert temporary to RBW // for TRANSFER_RBW, convert temporary to RBW
datanode.data.convertTemporaryToRbw(block); datanode.data.convertTemporaryToRbw(block);
} else { } else {
// for isDatnode or TRANSFER_FINALIZED // for isDatnode or TRANSFER_FINALIZED
// Finalize the block. // Finalize the block.
datanode.data.finalizeBlock(block); datanode.data.finalizeBlock(block);
}
} }
datanode.metrics.incrBlocksWritten(); datanode.metrics.incrBlocksWritten();
} }
@ -980,7 +983,14 @@ class BlockReceiver implements Closeable {
} }
return partialCrc; return partialCrc;
} }
/** The caller claims the ownership of the replica handler. */
private ReplicaHandler claimReplicaHandler() {
ReplicaHandler handler = replicaHandler;
replicaHandler = null;
return handler;
}
private static enum PacketResponderType { private static enum PacketResponderType {
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
} }
@ -1280,12 +1290,15 @@ class BlockReceiver implements Closeable {
* @param startTime time when BlockReceiver started receiving the block * @param startTime time when BlockReceiver started receiving the block
*/ */
private void finalizeBlock(long startTime) throws IOException { private void finalizeBlock(long startTime) throws IOException {
BlockReceiver.this.close(); long endTime = 0;
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() // Hold a volume reference to finalize block.
: 0; try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler()) {
block.setNumBytes(replicaInfo.getNumBytes()); BlockReceiver.this.close();
datanode.data.finalizeBlock(block); endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
}
if (pinning) { if (pinning) {
datanode.data.setPinning(block); datanode.data.setPinning(block);
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -64,6 +65,8 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.anyOf;
@ -77,6 +80,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
public class TestDataNodeHotSwapVolumes { public class TestDataNodeHotSwapVolumes {
@ -577,6 +581,7 @@ public class TestDataNodeHotSwapVolumes {
final DataNode dn = cluster.getDataNodes().get(dataNodeIdx); final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
final FileSystem fs = cluster.getFileSystem(); final FileSystem fs = cluster.getFileSystem();
final Path testFile = new Path("/test"); final Path testFile = new Path("/test");
final long lastTimeDiskErrorCheck = dn.getLastDiskErrorCheck();
FSDataOutputStream out = fs.create(testFile, REPLICATION); FSDataOutputStream out = fs.create(testFile, REPLICATION);
@ -586,6 +591,23 @@ public class TestDataNodeHotSwapVolumes {
out.write(writeBuf); out.write(writeBuf);
out.hflush(); out.hflush();
// Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
// BlockReceiver releases volume reference before finalizeBlock(), the blocks
// on the volume will be removed, and finalizeBlock() throws IOE.
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
dn.data = Mockito.spy(data);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation)
throws IOException, InterruptedException {
Thread.sleep(1000);
// Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
// the block is not removed, since the volume reference should not
// be released at this point.
data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]);
return null;
}
}).when(dn.data).finalizeBlock(any(ExtendedBlock.class));
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
List<String> oldDirs = getDataDirs(dn); List<String> oldDirs = getDataDirs(dn);
@ -612,13 +634,19 @@ public class TestDataNodeHotSwapVolumes {
out.hflush(); out.hflush();
out.close(); out.close();
reconfigThread.join();
// Verify the file has sufficient replications. // Verify the file has sufficient replications.
DFSTestUtil.waitReplication(fs, testFile, REPLICATION); DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
// Read the content back // Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length); assertEquals(BLOCK_SIZE, content.length);
reconfigThread.join(); // If an IOException thrown from BlockReceiver#run, it triggers
// DataNode#checkDiskError(). So we can test whether checkDiskError() is called,
// to see whether there is IOException in BlockReceiver#run().
assertEquals(lastTimeDiskErrorCheck, dn.getLastDiskErrorCheck());
if (!exceptions.isEmpty()) { if (!exceptions.isEmpty()) {
throw new IOException(exceptions.get(0).getCause()); throw new IOException(exceptions.get(0).getCause());
} }