HDFS-10267. Extra "synchronized" on FsDatasetImpl#recoverAppend and FsDatasetImpl#recoverClose
(cherry picked from commit4bd7cbc29d
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (cherry picked from commitb5e6ad457f
)
This commit is contained in:
parent
497c65ad0c
commit
354801fe61
|
@ -1271,7 +1271,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public synchronized ReplicaHandler recoverAppend(
|
public ReplicaHandler recoverAppend(
|
||||||
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
|
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
|
||||||
LOG.info("Recover failed append to " + b);
|
LOG.info("Recover failed append to " + b);
|
||||||
|
|
||||||
|
@ -1304,7 +1304,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
|
public Replica recoverClose(ExtendedBlock b, long newGS,
|
||||||
long expectedBlockLen) throws IOException {
|
long expectedBlockLen) throws IOException {
|
||||||
LOG.info("Recover failed close " + b);
|
LOG.info("Recover failed close " + b);
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -42,8 +42,10 @@ import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -84,6 +86,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -130,7 +133,7 @@ public class TestBlockRecovery {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long
|
private final long
|
||||||
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
|
TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts an instance of DataNode
|
* Starts an instance of DataNode
|
||||||
|
@ -143,11 +146,10 @@ public class TestBlockRecovery {
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
if (currentTestName.getMethodName().equals(
|
if (currentTestName.getMethodName().contains("DoesNotHoldLock")) {
|
||||||
"testInitReplicaRecoveryDoesNotHogLock")) {
|
|
||||||
// This test requires a very long value for the xceiver stop timeout.
|
// This test requires a very long value for the xceiver stop timeout.
|
||||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
||||||
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
|
TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS);
|
||||||
}
|
}
|
||||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
FileSystem.setDefaultUri(conf,
|
FileSystem.setDefaultUri(conf,
|
||||||
|
@ -759,96 +761,216 @@ public class TestBlockRecovery {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private static class TestStopWorkerSemaphore {
|
||||||
* Test that initReplicaRecovery does not hold the lock for an unreasonable
|
final Semaphore sem;
|
||||||
* amount of time if a writer is taking a long time to stop.
|
|
||||||
*/
|
final AtomicBoolean gotInterruption = new AtomicBoolean(false);
|
||||||
@Test(timeout=60000)
|
|
||||||
public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
|
TestStopWorkerSemaphore() {
|
||||||
if(LOG.isDebugEnabled()) {
|
this.sem = new Semaphore(0);
|
||||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to acquire a sempahore within a given timeout.
|
||||||
|
*
|
||||||
|
* This is useful for unit tests where we need to ignore InterruptedException
|
||||||
|
* when attempting to take a semaphore, but still want to honor the overall
|
||||||
|
* test timeout.
|
||||||
|
*
|
||||||
|
* @param timeoutMs The timeout in miliseconds.
|
||||||
|
*/
|
||||||
|
private void uninterruptiblyAcquire(long timeoutMs) throws Exception {
|
||||||
|
long startTimeMs = Time.monotonicNow();
|
||||||
|
while (true) {
|
||||||
|
long remTime = startTimeMs + timeoutMs - Time.monotonicNow();
|
||||||
|
if (remTime < 0) {
|
||||||
|
throw new RuntimeException("Failed to acquire the semaphore within " +
|
||||||
|
timeoutMs + " milliseconds.");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (sem.tryAcquire(1, remTime, TimeUnit.MILLISECONDS)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
gotInterruption.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface TestStopWorkerRunnable {
|
||||||
|
/**
|
||||||
|
* Return the name of the operation that this runnable performs.
|
||||||
|
*/
|
||||||
|
String opName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform the operation.
|
||||||
|
*/
|
||||||
|
void run(RecoveringBlock recoveringBlock) throws Exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testInitReplicaRecoveryDoesNotHoldLock() throws Exception {
|
||||||
|
testStopWorker(new TestStopWorkerRunnable() {
|
||||||
|
@Override
|
||||||
|
public String opName() {
|
||||||
|
return "initReplicaRecovery";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(RecoveringBlock recoveringBlock) throws Exception {
|
||||||
|
try {
|
||||||
|
spyDN.initReplicaRecovery(recoveringBlock);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!e.getMessage().contains("meta does not exist")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testRecoverAppendDoesNotHoldLock() throws Exception {
|
||||||
|
testStopWorker(new TestStopWorkerRunnable() {
|
||||||
|
@Override
|
||||||
|
public String opName() {
|
||||||
|
return "recoverAppend";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(RecoveringBlock recoveringBlock) throws Exception {
|
||||||
|
try {
|
||||||
|
ExtendedBlock extBlock = recoveringBlock.getBlock();
|
||||||
|
spyDN.getFSDataset().recoverAppend(extBlock,
|
||||||
|
extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!e.getMessage().contains(
|
||||||
|
"Corrupted replica ReplicaBeingWritten")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testRecoverCloseDoesNotHoldLock() throws Exception {
|
||||||
|
testStopWorker(new TestStopWorkerRunnable() {
|
||||||
|
@Override
|
||||||
|
public String opName() {
|
||||||
|
return "recoverClose";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(RecoveringBlock recoveringBlock) throws Exception {
|
||||||
|
try {
|
||||||
|
ExtendedBlock extBlock = recoveringBlock.getBlock();
|
||||||
|
spyDN.getFSDataset().recoverClose(extBlock,
|
||||||
|
extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!e.getMessage().contains(
|
||||||
|
"Corrupted replica ReplicaBeingWritten")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that an FsDatasetImpl operation does not hold the lock for an
|
||||||
|
* unreasonable amount of time if a writer is taking a long time to stop.
|
||||||
|
*/
|
||||||
|
private void testStopWorker(final TestStopWorkerRunnable tswr)
|
||||||
|
throws Exception {
|
||||||
|
LOG.debug("Running " + currentTestName.getMethodName());
|
||||||
// We need a long value for the data xceiver stop timeout.
|
// We need a long value for the data xceiver stop timeout.
|
||||||
// Otherwise the timeout will trigger, and we will not have tested that
|
// Otherwise the timeout will trigger, and we will not have tested that
|
||||||
// thread join was done locklessly.
|
// thread join was done locklessly.
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
|
TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS,
|
||||||
dn.getDnConf().getXceiverStopTimeout());
|
dn.getDnConf().getXceiverStopTimeout());
|
||||||
final Semaphore progressParent = new Semaphore(0);
|
final TestStopWorkerSemaphore progressParent =
|
||||||
final Semaphore terminateSlowWorker = new Semaphore(0);
|
new TestStopWorkerSemaphore();
|
||||||
final AtomicBoolean failure = new AtomicBoolean(false);
|
final TestStopWorkerSemaphore terminateSlowWriter =
|
||||||
|
new TestStopWorkerSemaphore();
|
||||||
|
final AtomicReference<String> failure =
|
||||||
|
new AtomicReference<String>(null);
|
||||||
Collection<RecoveringBlock> recoveringBlocks =
|
Collection<RecoveringBlock> recoveringBlocks =
|
||||||
initRecoveringBlocks();
|
initRecoveringBlocks();
|
||||||
final RecoveringBlock recoveringBlock =
|
final RecoveringBlock recoveringBlock =
|
||||||
Iterators.get(recoveringBlocks.iterator(), 0);
|
Iterators.get(recoveringBlocks.iterator(), 0);
|
||||||
final ExtendedBlock block = recoveringBlock.getBlock();
|
final ExtendedBlock block = recoveringBlock.getBlock();
|
||||||
Thread slowWorker = new Thread(new Runnable() {
|
Thread slowWriterThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// Register this thread as the writer for the recoveringBlock.
|
// Register this thread as the writer for the recoveringBlock.
|
||||||
LOG.debug("slowWorker creating rbw");
|
LOG.debug("slowWriter creating rbw");
|
||||||
ReplicaHandler replicaHandler =
|
ReplicaHandler replicaHandler =
|
||||||
spyDN.data.createRbw(StorageType.DISK, block, false);
|
spyDN.data.createRbw(StorageType.DISK, block, false);
|
||||||
replicaHandler.close();
|
replicaHandler.close();
|
||||||
LOG.debug("slowWorker created rbw");
|
LOG.debug("slowWriter created rbw");
|
||||||
// Tell the parent thread to start progressing.
|
// Tell the parent thread to start progressing.
|
||||||
progressParent.release();
|
progressParent.sem.release();
|
||||||
while (true) {
|
terminateSlowWriter.uninterruptiblyAcquire(60000);
|
||||||
try {
|
LOG.debug("slowWriter exiting");
|
||||||
terminateSlowWorker.acquire();
|
|
||||||
break;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Ignore interrupted exceptions so that the waitingWorker thread
|
|
||||||
// will have to wait for us.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("slowWorker exiting");
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("slowWorker got exception", t);
|
LOG.error("slowWriter got exception", t);
|
||||||
failure.set(true);
|
failure.compareAndSet(null, "slowWriter got exception " +
|
||||||
|
t.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// Start the slow worker thread and wait for it to take ownership of the
|
// Start the slow worker thread and wait for it to take ownership of the
|
||||||
// ReplicaInPipeline
|
// ReplicaInPipeline
|
||||||
slowWorker.start();
|
slowWriterThread.start();
|
||||||
while (true) {
|
progressParent.uninterruptiblyAcquire(60000);
|
||||||
try {
|
|
||||||
progressParent.acquire();
|
|
||||||
break;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Ignore interrupted exceptions
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start a worker thread which will wait for the slow worker thread.
|
// Start a worker thread which will attempt to stop the writer.
|
||||||
Thread waitingWorker = new Thread(new Runnable() {
|
Thread stopWriterThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// Attempt to terminate the other worker thread and take ownership
|
LOG.debug("initiating " + tswr.opName());
|
||||||
// of the ReplicaInPipeline.
|
tswr.run(recoveringBlock);
|
||||||
LOG.debug("waitingWorker initiating recovery");
|
LOG.debug("finished " + tswr.opName());
|
||||||
spyDN.initReplicaRecovery(recoveringBlock);
|
|
||||||
LOG.debug("waitingWorker initiated recovery");
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
GenericTestUtils.assertExceptionContains("meta does not exist", t);
|
LOG.error("stopWriterThread got unexpected exception for " +
|
||||||
|
tswr.opName(), t);
|
||||||
|
failure.compareAndSet(null, "stopWriterThread got unexpected " +
|
||||||
|
"exception for " + tswr.opName() + ": " + t.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
waitingWorker.start();
|
stopWriterThread.start();
|
||||||
|
|
||||||
// Do an operation that requires the lock. This should not be blocked
|
while (!terminateSlowWriter.gotInterruption.get()) {
|
||||||
// by the replica recovery in progress.
|
// Wait until stopWriterThread attempts to stop our slow writer by sending
|
||||||
|
// it an InterruptedException.
|
||||||
|
Thread.sleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We know that stopWriterThread is in the process of joining our slow
|
||||||
|
// writer. It must not hold the lock during this operation.
|
||||||
|
// In order to test that it does not, we attempt to do an operation that
|
||||||
|
// requires the lock-- getReplicaString.
|
||||||
spyDN.getFSDataset().getReplicaString(
|
spyDN.getFSDataset().getReplicaString(
|
||||||
recoveringBlock.getBlock().getBlockPoolId(),
|
recoveringBlock.getBlock().getBlockPoolId(),
|
||||||
recoveringBlock.getBlock().getBlockId());
|
recoveringBlock.getBlock().getBlockId());
|
||||||
|
|
||||||
// Wait for the two worker threads to exit normally.
|
// Tell the slow writer to exit, and then wait for all threads to join.
|
||||||
terminateSlowWorker.release();
|
terminateSlowWriter.sem.release();
|
||||||
slowWorker.join();
|
slowWriterThread.join();
|
||||||
waitingWorker.join();
|
stopWriterThread.join();
|
||||||
Assert.assertFalse("The slowWriter thread failed.", failure.get());
|
|
||||||
|
// Check that our worker threads exited cleanly. This is not checked by the
|
||||||
|
// unit test framework, so we have to do it manually here.
|
||||||
|
String failureReason = failure.get();
|
||||||
|
if (failureReason != null) {
|
||||||
|
Assert.fail("Thread failure: " + failureReason);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue