Merge from trunk r:1358794 HDFS-3541. Deadlock between recovery, xceiver and packet responder. Contributed by Vinay.

Submitted by:Vivay	
Reviewed by:	Uma Maheswara Rao G, Kihwal Lee, Aaron


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1358798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-07-08 17:56:41 +00:00
parent b035a14d58
commit 2f7ad820e8
4 changed files with 78 additions and 0 deletions

View File

@ -262,6 +262,8 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3581. FSPermissionChecker#checkPermission sticky bit check HDFS-3581. FSPermissionChecker#checkPermission sticky bit check
missing range check. (eli) missing range check. (eli)
HDFS-3541. Deadlock between recovery, xceiver and packet responder (Vinay via umamahesh)
BREAKDOWN OF HDFS-3042 SUBTASKS BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd) HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -844,6 +844,7 @@ class BlockReceiver implements Closeable {
try { try {
responder.join(); responder.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
responder.interrupt();
throw new IOException("Interrupted receiveBlock"); throw new IOException("Interrupted receiveBlock");
} }
responder = null; responder = null;
@ -1018,6 +1019,7 @@ class BlockReceiver implements Closeable {
wait(); wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
running = false; running = false;
Thread.currentThread().interrupt();
} }
} }
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {

View File

@ -838,6 +838,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b); ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) { if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has // this is legal, when recovery happens on a file that has

View File

@ -38,21 +38,27 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -561,4 +567,68 @@ public class TestBlockRecovery {
streams.close(); streams.close();
} }
} }
/**
* Test to verify the race between finalizeBlock and Lease recovery
*
* @throws Exception
*/
@Test(timeout = 20000)
public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
tearDown();// Stop the Mocked DN started in startup()
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleSingleNN(8020, 50070))
.numDataNodes(1).build();
try {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread() {
public void run() {
try {
DatanodeInfo[] locations = block.getLocations();
final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1);
synchronized (dataNode.data) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
} catch (Exception e) {
recoveryInitResult.set(false);
}
}
};
recoveryThread.start();
try {
out.close();
} catch (IOException e) {
Assert.assertTrue("Writing should fail",
e.getMessage().contains("are bad. Aborting..."));
} finally {
recoveryThread.join();
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();
cluster = null;
}
}
}
} }