HDFS-8496. Calling stopWriter() with FSDatasetImpl lock held may block other threads (cmccabe)

(cherry picked from commit f6b1a81812)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
This commit is contained in:
Colin Patrick Mccabe 2016-04-04 18:00:26 -07:00
parent 236d1a5093
commit d55af8b5a3
4 changed files with 257 additions and 81 deletions

View File

@ -22,6 +22,7 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -44,7 +45,7 @@ public class ReplicaInPipeline extends ReplicaInfo
private long bytesAcked; private long bytesAcked;
private long bytesOnDisk; private long bytesOnDisk;
private byte[] lastChecksum; private byte[] lastChecksum;
private Thread writer; private AtomicReference<Thread> writer = new AtomicReference<Thread>();
/** /**
* Bytes reserved for this replica on the containing volume. * Bytes reserved for this replica on the containing volume.
@ -97,7 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo
super( blockId, len, genStamp, vol, dir); super( blockId, len, genStamp, vol, dir);
this.bytesAcked = len; this.bytesAcked = len;
this.bytesOnDisk = len; this.bytesOnDisk = len;
this.writer = writer; this.writer.set(writer);
this.bytesReserved = bytesToReserve; this.bytesReserved = bytesToReserve;
this.originalBytesReserved = bytesToReserve; this.originalBytesReserved = bytesToReserve;
} }
@ -110,7 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo
super(from); super(from);
this.bytesAcked = from.getBytesAcked(); this.bytesAcked = from.getBytesAcked();
this.bytesOnDisk = from.getBytesOnDisk(); this.bytesOnDisk = from.getBytesOnDisk();
this.writer = from.writer; this.writer.set(from.writer.get());
this.bytesReserved = from.bytesReserved; this.bytesReserved = from.bytesReserved;
this.originalBytesReserved = from.originalBytesReserved; this.originalBytesReserved = from.originalBytesReserved;
} }
@ -175,18 +176,11 @@ public class ReplicaInPipeline extends ReplicaInfo
return new ChunkChecksum(getBytesOnDisk(), lastChecksum); return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
} }
/**
* Set the thread that is writing to this replica
* @param writer a thread writing to this replica
*/
public void setWriter(Thread writer) {
this.writer = writer;
}
public void interruptThread() { public void interruptThread() {
if (writer != null && writer != Thread.currentThread() Thread thread = writer.get();
&& writer.isAlive()) { if (thread != null && thread != Thread.currentThread()
this.writer.interrupt(); && thread.isAlive()) {
thread.interrupt();
} }
} }
@ -195,18 +189,36 @@ public class ReplicaInPipeline extends ReplicaInfo
return super.equals(o); return super.equals(o);
} }
/**
* Attempt to set the writer to a new value.
*/
public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
return writer.compareAndSet(prevWriter, newWriter);
}
/** /**
* Interrupt the writing thread and wait until it dies * Interrupt the writing thread and wait until it dies
* @throws IOException the waiting is interrupted * @throws IOException the waiting is interrupted
*/ */
public void stopWriter(long xceiverStopTimeout) throws IOException { public void stopWriter(long xceiverStopTimeout) throws IOException {
if (writer != null && writer != Thread.currentThread() && writer.isAlive()) { while (true) {
writer.interrupt(); Thread thread = writer.get();
if ((thread == null) || (thread == Thread.currentThread()) ||
(!thread.isAlive())) {
if (writer.compareAndSet(thread, null) == true) {
return; // Done
}
// The writer changed. Go back to the start of the loop and attempt to
// stop the new writer.
continue;
}
thread.interrupt();
try { try {
writer.join(xceiverStopTimeout); thread.join(xceiverStopTimeout);
if (writer.isAlive()) { if (thread.isAlive()) {
final String msg = "Join on writer thread " + writer + " timed out"; // Our thread join timed out.
DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(writer)); final String msg = "Join on writer thread " + thread + " timed out";
DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
throw new IOException(msg); throw new IOException(msg);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -1212,8 +1212,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return newReplicaInfo; return newReplicaInfo;
} }
private static class MustStopExistingWriter extends Exception {
private final ReplicaInPipeline rip;
MustStopExistingWriter(ReplicaInPipeline rip) {
this.rip = rip;
}
ReplicaInPipeline getReplica() {
return rip;
}
}
private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException { long expectedBlockLen) throws IOException, MustStopExistingWriter {
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check state // check state
@ -1237,9 +1249,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long replicaLen = replicaInfo.getNumBytes(); long replicaLen = replicaInfo.getNumBytes();
if (replicaInfo.getState() == ReplicaState.RBW) { if (replicaInfo.getState() == ReplicaState.RBW) {
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
// kill the previous writer if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); throw new MustStopExistingWriter(rbw);
rbw.setWriter(Thread.currentThread()); }
// check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
if (replicaLen != rbw.getBytesOnDisk() if (replicaLen != rbw.getBytesOnDisk()
|| replicaLen != rbw.getBytesAcked()) { || replicaLen != rbw.getBytesAcked()) {
@ -1265,39 +1277,55 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
LOG.info("Recover failed append to " + b); LOG.info("Recover failed append to " + b);
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); while (true) {
try {
synchronized (this) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaBeingWritten replica; ReplicaBeingWritten replica;
try { try {
// change the replica's state/gs etc. // change the replica's state/gs etc.
if (replicaInfo.getState() == ReplicaState.FINALIZED) { if (replicaInfo.getState() == ReplicaState.FINALIZED) {
replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
newGS, b.getNumBytes()); newGS, b.getNumBytes());
} else { //RBW } else { //RBW
bumpReplicaGS(replicaInfo, newGS); bumpReplicaGS(replicaInfo, newGS);
replica = (ReplicaBeingWritten) replicaInfo; replica = (ReplicaBeingWritten) replicaInfo;
}
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
return new ReplicaHandler(replica, ref);
}
} catch (MustStopExistingWriter e) {
e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
} }
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
} }
return new ReplicaHandler(replica, ref);
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized Replica recoverClose(ExtendedBlock b, long newGS, public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException { long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b); LOG.info("Recover failed close " + b);
// check replica's state while (true) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); try {
// bump the replica's GS synchronized (this) {
bumpReplicaGS(replicaInfo, newGS); // check replica's state
// finalize the replica if RBW ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
if (replicaInfo.getState() == ReplicaState.RBW) { // bump the replica's GS
finalizeReplica(b.getBlockPoolId(), replicaInfo); bumpReplicaGS(replicaInfo, newGS);
// finalize the replica if RBW
if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
return replicaInfo;
}
} catch (MustStopExistingWriter e) {
e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
}
} }
return replicaInfo;
} }
/** /**
@ -1389,26 +1417,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw( public ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException { throws IOException {
LOG.info("Recover RBW replica " + b); LOG.info("Recover RBW replica " + b);
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); while (true) {
try {
synchronized (this) {
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state // check the replica's state
if (replicaInfo.getState() != ReplicaState.RBW) { if (replicaInfo.getState() != ReplicaState.RBW) {
throw new ReplicaNotFoundException( throw new ReplicaNotFoundException(
ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
}
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
throw new MustStopExistingWriter(rbw);
}
LOG.info("Recovering " + rbw);
return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
}
} catch (MustStopExistingWriter e) {
e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
}
} }
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; }
LOG.info("Recovering " + rbw);
// Stop the previous writer
rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
rbw.setWriter(Thread.currentThread());
private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
// check generation stamp // check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp(); long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() || if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1424,7 +1463,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long numBytes = rbw.getNumBytes(); long numBytes = rbw.getNumBytes();
if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
throw new ReplicaNotFoundException("Unmatched length replica " + throw new ReplicaNotFoundException("Unmatched length replica " +
replicaInfo + ": BytesAcked = " + bytesAcked + rbw + ": BytesAcked = " + bytesAcked +
" BytesRcvd = " + numBytes + " are not in the range of [" + " BytesRcvd = " + numBytes + " are not in the range of [" +
minBytesRcvd + ", " + maxBytesRcvd + "]."); minBytesRcvd + ", " + maxBytesRcvd + "].");
} }
@ -2356,8 +2395,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaRecoveryInfo initReplicaRecovery( public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
RecoveringBlock rBlock) throws IOException { throws IOException {
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap, return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(), rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
datanode.getDnConf().getXceiverStopTimeout()); datanode.getDnConf().getXceiverStopTimeout());
@ -2366,6 +2405,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */ /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException { Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
synchronized (map.getMutex()) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
e.getReplica().stopWriter(xceiverStopTimeout);
}
}
}
static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
Block block, long recoveryId)
throws IOException, MustStopExistingWriter {
final ReplicaInfo replica = map.get(bpid, block.getBlockId()); final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica); + ", replica=" + replica);
@ -2378,7 +2431,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//stop writer if there is any //stop writer if there is any
if (replica instanceof ReplicaInPipeline) { if (replica instanceof ReplicaInPipeline) {
final ReplicaInPipeline rip = (ReplicaInPipeline)replica; final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
rip.stopWriter(xceiverStopTimeout); if (!rip.attemptToSetWriter(null, Thread.currentThread())) {
throw new MustStopExistingWriter(rip);
}
//check replica bytes on disk. //check replica bytes on disk.
if (rip.getBytesOnDisk() < rip.getVisibleLength()) { if (rip.getBytesOnDisk() < rip.getVisibleLength()) {

View File

@ -220,7 +220,7 @@ class ReplicaMap {
* Give access to mutex used for synchronizing ReplicasMap * Give access to mutex used for synchronizing ReplicasMap
* @return object used as lock * @return object used as lock
*/ */
Object getMutext() { Object getMutex() {
return mutex; return mutex;
} }
} }

View File

@ -40,10 +40,12 @@ import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -86,7 +88,9 @@ import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -117,11 +121,17 @@ public class TestBlockRecovery {
private final static ExtendedBlock block = new ExtendedBlock(POOL_ID, private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
BLOCK_ID, BLOCK_LEN, GEN_STAMP); BLOCK_ID, BLOCK_LEN, GEN_STAMP);
@Rule
public TestName currentTestName = new TestName();
static { static {
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.ALL);
GenericTestUtils.setLogLevel(LOG, Level.ALL); GenericTestUtils.setLogLevel(LOG, Level.ALL);
} }
private final long
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
/** /**
* Starts an instance of DataNode * Starts an instance of DataNode
* @throws IOException * @throws IOException
@ -133,6 +143,12 @@ public class TestBlockRecovery {
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
if (currentTestName.getMethodName().equals(
"testInitReplicaRecoveryDoesNotHogLock")) {
// This test requires a very long value for the xceiver stop timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
}
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf, FileSystem.setDefaultUri(conf,
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
@ -265,7 +281,7 @@ public class TestBlockRecovery {
* Two replicas are in Finalized state * Two replicas are in Finalized state
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testFinalizedReplicas () throws IOException { public void testFinalizedReplicas () throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -304,7 +320,7 @@ public class TestBlockRecovery {
* One replica is Finalized and another is RBW. * One replica is Finalized and another is RBW.
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testFinalizedRbwReplicas() throws IOException { public void testFinalizedRbwReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -345,7 +361,7 @@ public class TestBlockRecovery {
* One replica is Finalized and another is RWR. * One replica is Finalized and another is RWR.
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testFinalizedRwrReplicas() throws IOException { public void testFinalizedRwrReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -387,7 +403,7 @@ public class TestBlockRecovery {
* Two replicas are RBW. * Two replicas are RBW.
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testRBWReplicas() throws IOException { public void testRBWReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -411,7 +427,7 @@ public class TestBlockRecovery {
* One replica is RBW and another is RWR. * One replica is RBW and another is RWR.
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testRBW_RWRReplicas() throws IOException { public void testRBW_RWRReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -436,7 +452,7 @@ public class TestBlockRecovery {
* Two replicas are RWR. * Two replicas are RWR.
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testRWRReplicas() throws IOException { public void testRWRReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -472,7 +488,7 @@ public class TestBlockRecovery {
* @throws IOException * @throws IOException
* in case of an error * in case of an error
*/ */
@Test @Test(timeout=60000)
public void testRecoveryInProgressException() public void testRecoveryInProgressException()
throws IOException, InterruptedException { throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -497,7 +513,7 @@ public class TestBlockRecovery {
* @throws IOException * @throws IOException
* in case of an error * in case of an error
*/ */
@Test @Test(timeout=60000)
public void testErrorReplicas() throws IOException, InterruptedException { public void testErrorReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -524,7 +540,7 @@ public class TestBlockRecovery {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testZeroLenReplicas() throws IOException, InterruptedException { public void testZeroLenReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -564,7 +580,7 @@ public class TestBlockRecovery {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testFailedReplicaUpdate() throws IOException { public void testFailedReplicaUpdate() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -586,7 +602,7 @@ public class TestBlockRecovery {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testNoReplicaUnderRecovery() throws IOException { public void testNoReplicaUnderRecovery() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -611,7 +627,7 @@ public class TestBlockRecovery {
* *
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test(timeout=60000)
public void testNotMatchedReplicaID() throws IOException { public void testNotMatchedReplicaID() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -712,7 +728,7 @@ public class TestBlockRecovery {
* throw an exception. * throw an exception.
* @throws Exception * @throws Exception
*/ */
@Test @Test(timeout=60000)
public void testRURReplicas() throws Exception { public void testRURReplicas() throws Exception {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
@ -742,4 +758,97 @@ public class TestBlockRecovery {
assertTrue(exceptionThrown); assertTrue(exceptionThrown);
} }
} }
/**
* Test that initReplicaRecovery does not hold the lock for an unreasonable
* amount of time if a writer is taking a long time to stop.
*/
@Test(timeout=60000)
public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
// We need a long value for the data xceiver stop timeout.
// Otherwise the timeout will trigger, and we will not have tested that
// thread join was done locklessly.
Assert.assertEquals(
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
dn.getDnConf().getXceiverStopTimeout());
final Semaphore progressParent = new Semaphore(0);
final Semaphore terminateSlowWorker = new Semaphore(0);
final AtomicBoolean failure = new AtomicBoolean(false);
Collection<RecoveringBlock> recoveringBlocks =
initRecoveringBlocks();
final RecoveringBlock recoveringBlock =
Iterators.get(recoveringBlocks.iterator(), 0);
final ExtendedBlock block = recoveringBlock.getBlock();
Thread slowWorker = new Thread(new Runnable() {
@Override
public void run() {
try {
// Register this thread as the writer for the recoveringBlock.
LOG.debug("slowWorker creating rbw");
ReplicaHandler replicaHandler =
spyDN.data.createRbw(StorageType.DISK, block, false);
replicaHandler.close();
LOG.debug("slowWorker created rbw");
// Tell the parent thread to start progressing.
progressParent.release();
while (true) {
try {
terminateSlowWorker.acquire();
break;
} catch (InterruptedException e) {
// Ignore interrupted exceptions so that the waitingWorker thread
// will have to wait for us.
}
}
LOG.debug("slowWorker exiting");
} catch (Throwable t) {
LOG.error("slowWorker got exception", t);
failure.set(true);
}
}
});
// Start the slow worker thread and wait for it to take ownership of the
// ReplicaInPipeline
slowWorker.start();
while (true) {
try {
progressParent.acquire();
break;
} catch (InterruptedException e) {
// Ignore interrupted exceptions
}
}
// Start a worker thread which will wait for the slow worker thread.
Thread waitingWorker = new Thread(new Runnable() {
@Override
public void run() {
try {
// Attempt to terminate the other worker thread and take ownership
// of the ReplicaInPipeline.
LOG.debug("waitingWorker initiating recovery");
spyDN.initReplicaRecovery(recoveringBlock);
LOG.debug("waitingWorker initiated recovery");
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("meta does not exist", t);
}
}
});
waitingWorker.start();
// Do an operation that requires the lock. This should not be blocked
// by the replica recovery in progress.
spyDN.getFSDataset().getReplicaString(
recoveringBlock.getBlock().getBlockPoolId(),
recoveringBlock.getBlock().getBlockId());
// Wait for the two worker threads to exit normally.
terminateSlowWorker.release();
slowWorker.join();
waitingWorker.join();
Assert.assertFalse("The slowWriter thread failed.", failure.get());
}
} }