HDFS-5016. Merge 1507189 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507191 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-26 04:47:25 +00:00
parent 3bd39b2ea1
commit e3d5a147d0
9 changed files with 78 additions and 23 deletions

View File

@ -907,4 +907,16 @@ public class StringUtils {
buf.putLong(uuid.getLeastSignificantBits()); buf.putLong(uuid.getLeastSignificantBits());
return buf.array(); return buf.array();
} }
/**
* Get stack trace for a given thread.
*/
public static String getStackTrace(Thread t) {
final StackTraceElement[] stackTrace = t.getStackTrace();
StringBuilder str = new StringBuilder();
for (StackTraceElement e : stackTrace) {
str.append(e.toString() + "\n");
}
return str.toString();
}
} }

View File

@ -516,6 +516,9 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4602. TestBookKeeperHACheckpoints fails. (umamahesh) HDFS-4602. TestBookKeeperHACheckpoints fails. (umamahesh)
HDFS-5016. Deadlock in pipeline recovery causes Datanode to be marked dead.
(suresh)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -495,4 +495,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
// Hidden configuration undocumented in hdfs-site. xml
// Timeout to wait for block receiver and responder thread to stop
public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis";
public static final long DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT = 60000;
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -731,7 +732,13 @@ class BlockReceiver implements Closeable {
} }
if (responder != null) { if (responder != null) {
try { try {
responder.join(); responder.join(datanode.getDnConf().getXceiverStopTimeout());
if (responder.isAlive()) {
String msg = "Join on responder thread " + responder
+ " timed out";
LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder));
throw new IOException(msg);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
responder.interrupt(); responder.interrupt();
throw new IOException("Interrupted receiveBlock"); throw new IOException("Interrupted receiveBlock");

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.InterfaceAudience;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
@ -29,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFA
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
@ -44,7 +47,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
* Simple class encapsulating all of the configuration that the DataNode * Simple class encapsulating all of the configuration that the DataNode
* loads at startup time. * loads at startup time.
*/ */
class DNConf { @InterfaceAudience.Private
public class DNConf {
final int socketTimeout; final int socketTimeout;
final int socketWriteTimeout; final int socketWriteTimeout;
final int socketKeepaliveTimeout; final int socketKeepaliveTimeout;
@ -67,6 +71,8 @@ class DNConf {
final String minimumNameNodeVersion; final String minimumNameNodeVersion;
final String encryptionAlgorithm; final String encryptionAlgorithm;
final long xceiverStopTimeout;
public DNConf(Configuration conf) { public DNConf(Configuration conf) {
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT); HdfsServerConstants.READ_TIMEOUT);
@ -127,10 +133,18 @@ class DNConf {
this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
} }
// We get minimumNameNodeVersion via a method so it can be mocked out in tests. // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
String getMinimumNameNodeVersion() { String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion; return this.minimumNameNodeVersion;
} }
public long getXceiverStopTimeout() {
return xceiverStopTimeout;
}
} }

View File

@ -2448,7 +2448,7 @@ public class DataNode extends Configured
return dxcs.balanceThrottler.getBandwidth(); return dxcs.balanceThrottler.getBandwidth();
} }
DNConf getDnConf() { public DNConf getDnConf() {
return dnConf; return dnConf;
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
/** /**
* This class defines a replica in a pipeline, which * This class defines a replica in a pipeline, which
@ -150,11 +151,16 @@ public class ReplicaInPipeline extends ReplicaInfo
* Interrupt the writing thread and wait until it dies * Interrupt the writing thread and wait until it dies
* @throws IOException the waiting is interrupted * @throws IOException the waiting is interrupted
*/ */
public void stopWriter() throws IOException { public void stopWriter(long xceiverStopTimeout) throws IOException {
if (writer != null && writer != Thread.currentThread() && writer.isAlive()) { if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
writer.interrupt(); writer.interrupt();
try { try {
writer.join(); writer.join(xceiverStopTimeout);
if (writer.isAlive()) {
final String msg = "Join on writer thread " + writer + " timed out";
DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer));
throw new IOException(msg);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Waiting for writer thread is interrupted."); throw new IOException("Waiting for writer thread is interrupted.");
} }

View File

@ -76,7 +76,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -615,7 +614,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaInfo.getState() == ReplicaState.RBW) { if (replicaInfo.getState() == ReplicaState.RBW) {
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
// kill the previous writer // kill the previous writer
rbw.stopWriter(); rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
rbw.setWriter(Thread.currentThread()); rbw.setWriter(Thread.currentThread());
// check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
if (replicaLen != rbw.getBytesOnDisk() if (replicaLen != rbw.getBytesOnDisk()
@ -735,7 +734,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recovering " + rbw); LOG.info("Recovering " + rbw);
// Stop the previous writer // Stop the previous writer
rbw.stopWriter(); rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
rbw.setWriter(Thread.currentThread()); rbw.setWriter(Thread.currentThread());
// check generation stamp // check generation stamp
@ -1451,13 +1450,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaRecoveryInfo initReplicaRecovery( public synchronized ReplicaRecoveryInfo initReplicaRecovery(
RecoveringBlock rBlock) throws IOException { RecoveringBlock rBlock) throws IOException {
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
volumeMap, rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp()); rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
datanode.getDnConf().getXceiverStopTimeout());
} }
/** static version of {@link #initReplicaRecovery(Block, long)}. */ /** static version of {@link #initReplicaRecovery(Block, long)}. */
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
ReplicaMap map, Block block, long recoveryId) throws IOException { Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
final ReplicaInfo replica = map.get(bpid, block.getBlockId()); final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica); + ", replica=" + replica);
@ -1470,7 +1470,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//stop writer if there is any //stop writer if there is any
if (replica instanceof ReplicaInPipeline) { if (replica instanceof ReplicaInPipeline) {
final ReplicaInPipeline rip = (ReplicaInPipeline)replica; final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
rip.stopWriter(); rip.stopWriter(xceiverStopTimeout);
//check replica bytes on disk. //check replica bytes on disk.
if (rip.getBytesOnDisk() < rip.getVisibleLength()) { if (rip.getBytesOnDisk() < rip.getVisibleLength()) {

View File

@ -167,7 +167,7 @@ public class TestInterDatanodeProtocol {
cluster.waitActive(); cluster.waitActive();
//create a file //create a file
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
String filestr = "/foo"; String filestr = "/foo";
Path filepath = new Path(filestr); Path filepath = new Path(filestr);
DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
@ -225,7 +225,7 @@ public class TestInterDatanodeProtocol {
} }
/** Test /** Test
* {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long)} * {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long, long)}
*/ */
@Test @Test
public void testInitReplicaRecovery() throws IOException { public void testInitReplicaRecovery() throws IOException {
@ -246,8 +246,9 @@ public class TestInterDatanodeProtocol {
final ReplicaInfo originalInfo = map.get(bpid, b); final ReplicaInfo originalInfo = map.get(bpid, b);
final long recoveryid = gs + 1; final long recoveryid = gs + 1;
final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery( final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl
bpid, map, blocks[0], recoveryid); .initReplicaRecovery(bpid, map, blocks[0], recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
assertEquals(originalInfo, recoveryInfo); assertEquals(originalInfo, recoveryInfo);
final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b); final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
@ -256,7 +257,9 @@ public class TestInterDatanodeProtocol {
//recover one more time //recover one more time
final long recoveryid2 = gs + 2; final long recoveryid2 = gs + 2;
final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], recoveryid2); final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl
.initReplicaRecovery(bpid, map, blocks[0], recoveryid2,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
assertEquals(originalInfo, recoveryInfo2); assertEquals(originalInfo, recoveryInfo2);
final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b); final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
@ -265,7 +268,8 @@ public class TestInterDatanodeProtocol {
//case RecoveryInProgressException //case RecoveryInProgressException
try { try {
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
Assert.fail(); Assert.fail();
} }
catch(RecoveryInProgressException ripe) { catch(RecoveryInProgressException ripe) {
@ -276,7 +280,9 @@ public class TestInterDatanodeProtocol {
{ // BlockRecoveryFI_01: replica not found { // BlockRecoveryFI_01: replica not found
final long recoveryid = gs + 1; final long recoveryid = gs + 1;
final Block b = new Block(firstblockid - 1, length, gs); final Block b = new Block(firstblockid - 1, length, gs);
ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b,
recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
Assert.assertNull("Data-node should not have this replica.", r); Assert.assertNull("Data-node should not have this replica.", r);
} }
@ -284,7 +290,8 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs - 1; final long recoveryid = gs - 1;
final Block b = new Block(firstblockid + 1, length, gs); final Block b = new Block(firstblockid + 1, length, gs);
try { try {
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
Assert.fail(); Assert.fail();
} }
catch(IOException ioe) { catch(IOException ioe) {
@ -297,7 +304,8 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs + 1; final long recoveryid = gs + 1;
final Block b = new Block(firstblockid, length, gs+1); final Block b = new Block(firstblockid, length, gs+1);
try { try {
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid); FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid,
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
fail("InitReplicaRecovery should fail because replica's " + fail("InitReplicaRecovery should fail because replica's " +
"gs is less than the block's gs"); "gs is less than the block's gs");
} catch (IOException e) { } catch (IOException e) {
@ -321,7 +329,7 @@ public class TestInterDatanodeProtocol {
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
//create a file //create a file
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
String filestr = "/foo"; String filestr = "/foo";
Path filepath = new Path(filestr); Path filepath = new Path(filestr);
DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);