diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ce0184bd92f..0d50feb4ef9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -262,6 +262,8 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3581. FSPermissionChecker#checkPermission sticky bit check missing range check. (eli) + HDFS-3541. Deadlock between recovery, xceiver and packet responder (Vinay via umamahesh) + BREAKDOWN OF HDFS-3042 SUBTASKS HDFS-2185. HDFS portion of ZK-based FailoverController (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index d379927be35..dec7b82e1d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -844,6 +844,7 @@ class BlockReceiver implements Closeable { try { responder.join(); } catch (InterruptedException e) { + responder.interrupt(); throw new IOException("Interrupted receiveBlock"); } responder = null; @@ -1018,6 +1019,7 @@ class BlockReceiver implements Closeable { wait(); } catch (InterruptedException e) { running = false; + Thread.currentThread().interrupt(); } } if(LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 00d5d94ae79..c9701242467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -838,6 +838,10 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { + if (Thread.interrupted()) { + // Don't allow data modifications from interrupted threads + throw new IOException("Cannot finalize block from Interrupted Thread"); + } ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { // this is legal, when recovery happens on a file that has diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 3ec710113a1..378a4bab98a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -38,21 +38,27 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -561,4 +567,68 @@ public class TestBlockRecovery { streams.close(); } } + + /** + * Test to verify the race between finalizeBlock and Lease recovery + * + * @throws Exception + */ + @Test(timeout = 20000) + public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception { + tearDown();// Stop the Mocked DN started in startup() + + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleSingleNN(8020, 50070)) + .numDataNodes(1).build(); + try { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path path = new Path("/test"); + FSDataOutputStream out = fs.create(path); + out.writeBytes("data"); + out.hsync(); + + List blocks = DFSTestUtil.getAllBlocks(fs.open(path)); + final LocatedBlock block = blocks.get(0); + final DataNode dataNode = cluster.getDataNodes().get(0); + + final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); + Thread recoveryThread = new Thread() { + public void run() { + try { + DatanodeInfo[] locations = block.getLocations(); + final RecoveringBlock recoveringBlock = new RecoveringBlock( + block.getBlock(), locations, block.getBlock() + .getGenerationStamp() + 1); + synchronized (dataNode.data) { + Thread.sleep(2000); + dataNode.initReplicaRecovery(recoveringBlock); + } + } catch (Exception e) { + recoveryInitResult.set(false); + } + } + }; + recoveryThread.start(); + try { + out.close(); + } catch (IOException e) { + Assert.assertTrue("Writing should fail", + e.getMessage().contains("are bad. Aborting...")); + } finally { + recoveryThread.join(); + } + Assert.assertTrue("Recovery should be initiated successfully", + recoveryInitResult.get()); + + dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() + .getGenerationStamp() + 1, block.getBlockSize()); + } finally { + if (null != cluster) { + cluster.shutdown(); + cluster = null; + } + } + } }