From e3d5a147d0cf9a8d19c65a2d75e438984fcf2d11 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Fri, 26 Jul 2013 04:47:25 +0000 Subject: [PATCH] HDFS-5016. Merge 1507189 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507191 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/util/StringUtils.java | 12 ++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 ++++ .../hdfs/server/datanode/BlockReceiver.java | 9 +++++- .../hadoop/hdfs/server/datanode/DNConf.java | 16 ++++++++++- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../server/datanode/ReplicaInPipeline.java | 10 +++++-- .../fsdataset/impl/FsDatasetImpl.java | 16 +++++------ .../impl/TestInterDatanodeProtocol.java | 28 ++++++++++++------- 9 files changed, 78 insertions(+), 23 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index 27096b441ae..cba68685ed0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -907,4 +907,16 @@ public class StringUtils { buf.putLong(uuid.getLeastSignificantBits()); return buf.array(); } + + /** + * Get stack trace for a given thread. + */ + public static String getStackTrace(Thread t) { + final StackTraceElement[] stackTrace = t.getStackTrace(); + StringBuilder str = new StringBuilder(); + for (StackTraceElement e : stackTrace) { + str.append(e.toString() + "\n"); + } + return str.toString(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1df4aa90661..24ed712588d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -516,6 +516,9 @@ Release 2.1.0-beta - 2013-07-02 HDFS-4602. TestBookKeeperHACheckpoints fails. (umamahesh) + HDFS-5016. Deadlock in pipeline recovery causes Datanode to be marked dead. + (suresh) + BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b32c15f3d36..7bba4f503a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -495,4 +495,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; + + // Hidden configuration undocumented in hdfs-site. xml + // Timeout to wait for block receiver and responder thread to stop + public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis"; + public static final long DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT = 60000; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 9dc7a045efd..667874978fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -51,6 +51,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -731,7 +732,13 @@ class BlockReceiver implements Closeable { } if (responder != null) { try { - responder.join(); + responder.join(datanode.getDnConf().getXceiverStopTimeout()); + if (responder.isAlive()) { + String msg = "Join on responder thread " + responder + + " timed out"; + LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder)); + throw new IOException(msg); + } } catch (InterruptedException e) { responder.interrupt(); throw new IOException("Interrupted receiveBlock"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index d4b0ffd1f85..1577d78eddc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.classification.InterfaceAudience; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; @@ -29,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFA import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY; @@ -44,7 +47,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; * Simple class encapsulating all of the configuration that the DataNode * loads at startup time. */ -class DNConf { +@InterfaceAudience.Private +public class DNConf { final int socketTimeout; final int socketWriteTimeout; final int socketKeepaliveTimeout; @@ -66,6 +70,8 @@ class DNConf { final String minimumNameNodeVersion; final String encryptionAlgorithm; + + final long xceiverStopTimeout; public DNConf(Configuration conf) { socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -127,10 +133,18 @@ class DNConf { this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + + this.xceiverStopTimeout = conf.getLong( + DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, + DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. String getMinimumNameNodeVersion() { return this.minimumNameNodeVersion; } + + public long getXceiverStopTimeout() { + return xceiverStopTimeout; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 65225e478d8..b865fe0abff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2448,7 +2448,7 @@ public class DataNode extends Configured return dxcs.balanceThrottler.getBandwidth(); } - DNConf getDnConf() { + public DNConf getDnConf() { return dnConf; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index a9ef36dd969..9ea7b7d7e74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.StringUtils; /** * This class defines a replica in a pipeline, which @@ -150,11 +151,16 @@ public class ReplicaInPipeline extends ReplicaInfo * Interrupt the writing thread and wait until it dies * @throws IOException the waiting is interrupted */ - public void stopWriter() throws IOException { + public void stopWriter(long xceiverStopTimeout) throws IOException { if (writer != null && writer != Thread.currentThread() && writer.isAlive()) { writer.interrupt(); try { - writer.join(); + writer.join(xceiverStopTimeout); + if (writer.isAlive()) { + final String msg = "Join on writer thread " + writer + " timed out"; + DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer)); + throw new IOException(msg); + } } catch (InterruptedException e) { throw new IOException("Waiting for writer thread is interrupted."); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c252e77e763..ba0d2a873cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -76,7 +76,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.DataChecksum; @@ -615,7 +614,7 @@ class FsDatasetImpl implements FsDatasetSpi { if (replicaInfo.getState() == ReplicaState.RBW) { ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; // kill the previous writer - rbw.stopWriter(); + rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); rbw.setWriter(Thread.currentThread()); // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same if (replicaLen != rbw.getBytesOnDisk() @@ -735,7 +734,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("Recovering " + rbw); // Stop the previous writer - rbw.stopWriter(); + rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); rbw.setWriter(Thread.currentThread()); // check generation stamp @@ -1451,13 +1450,14 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public synchronized ReplicaRecoveryInfo initReplicaRecovery( RecoveringBlock rBlock) throws IOException { - return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), - volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp()); + return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap, + rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(), + datanode.getDnConf().getXceiverStopTimeout()); } /** static version of {@link #initReplicaRecovery(Block, long)}. */ - static ReplicaRecoveryInfo initReplicaRecovery(String bpid, - ReplicaMap map, Block block, long recoveryId) throws IOException { + static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, + Block block, long recoveryId, long xceiverStopTimeout) throws IOException { final ReplicaInfo replica = map.get(bpid, block.getBlockId()); LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + ", replica=" + replica); @@ -1470,7 +1470,7 @@ class FsDatasetImpl implements FsDatasetSpi { //stop writer if there is any if (replica instanceof ReplicaInPipeline) { final ReplicaInPipeline rip = (ReplicaInPipeline)replica; - rip.stopWriter(); + rip.stopWriter(xceiverStopTimeout); //check replica bytes on disk. if (rip.getBytesOnDisk() < rip.getVisibleLength()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index b1733efa9f3..ff2c9894086 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -167,7 +167,7 @@ public class TestInterDatanodeProtocol { cluster.waitActive(); //create a file - DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); + DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); @@ -225,7 +225,7 @@ public class TestInterDatanodeProtocol { } /** Test - * {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long)} + * {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long, long)} */ @Test public void testInitReplicaRecovery() throws IOException { @@ -246,8 +246,9 @@ public class TestInterDatanodeProtocol { final ReplicaInfo originalInfo = map.get(bpid, b); final long recoveryid = gs + 1; - final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery( - bpid, map, blocks[0], recoveryid); + final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl + .initReplicaRecovery(bpid, map, blocks[0], recoveryid, + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); assertEquals(originalInfo, recoveryInfo); final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b); @@ -256,7 +257,9 @@ public class TestInterDatanodeProtocol { //recover one more time final long recoveryid2 = gs + 2; - final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], recoveryid2); + final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl + .initReplicaRecovery(bpid, map, blocks[0], recoveryid2, + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); assertEquals(originalInfo, recoveryInfo2); final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b); @@ -265,7 +268,8 @@ public class TestInterDatanodeProtocol { //case RecoveryInProgressException try { - FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); + FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); Assert.fail(); } catch(RecoveryInProgressException ripe) { @@ -276,7 +280,9 @@ public class TestInterDatanodeProtocol { { // BlockRecoveryFI_01: replica not found final long recoveryid = gs + 1; final Block b = new Block(firstblockid - 1, length, gs); - ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); + ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, + recoveryid, + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); Assert.assertNull("Data-node should not have this replica.", r); } @@ -284,7 +290,8 @@ public class TestInterDatanodeProtocol { final long recoveryid = gs - 1; final Block b = new Block(firstblockid + 1, length, gs); try { - FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); + FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); Assert.fail(); } catch(IOException ioe) { @@ -297,7 +304,8 @@ public class TestInterDatanodeProtocol { final long recoveryid = gs + 1; final Block b = new Block(firstblockid, length, gs+1); try { - FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); + FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); fail("InitReplicaRecovery should fail because replica's " + "gs is less than the block's gs"); } catch (IOException e) { @@ -321,7 +329,7 @@ public class TestInterDatanodeProtocol { String bpid = cluster.getNamesystem().getBlockPoolId(); //create a file - DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); + DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);