HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak
(cherry picked from commit 5304698dc8
)
This commit is contained in:
parent
def87db9cb
commit
482fd5a880
|
@ -624,10 +624,16 @@ public abstract class GenericTestUtils {
|
||||||
* conditions.
|
* conditions.
|
||||||
*/
|
*/
|
||||||
public static class SleepAnswer implements Answer<Object> {
|
public static class SleepAnswer implements Answer<Object> {
|
||||||
|
private final int minSleepTime;
|
||||||
private final int maxSleepTime;
|
private final int maxSleepTime;
|
||||||
private static Random r = new Random();
|
private static Random r = new Random();
|
||||||
|
|
||||||
public SleepAnswer(int maxSleepTime) {
|
public SleepAnswer(int maxSleepTime) {
|
||||||
|
this(0, maxSleepTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SleepAnswer(int minSleepTime, int maxSleepTime) {
|
||||||
|
this.minSleepTime = minSleepTime;
|
||||||
this.maxSleepTime = maxSleepTime;
|
this.maxSleepTime = maxSleepTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -635,7 +641,7 @@ public abstract class GenericTestUtils {
|
||||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
boolean interrupted = false;
|
boolean interrupted = false;
|
||||||
try {
|
try {
|
||||||
Thread.sleep(r.nextInt(maxSleepTime));
|
Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
interrupted = true;
|
interrupted = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,6 +164,8 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
|
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
|
||||||
"generation stamp is in the future";
|
"generation stamp is in the future";
|
||||||
|
|
||||||
|
private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
|
||||||
|
|
||||||
private final Namesystem namesystem;
|
private final Namesystem namesystem;
|
||||||
|
|
||||||
private final BlockManagerSafeMode bmSafeMode;
|
private final BlockManagerSafeMode bmSafeMode;
|
||||||
|
@ -353,6 +355,9 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final PendingReconstructionBlocks pendingReconstruction;
|
final PendingReconstructionBlocks pendingReconstruction;
|
||||||
|
|
||||||
|
/** Stores information about block recovery attempts. */
|
||||||
|
private final PendingRecoveryBlocks pendingRecoveryBlocks;
|
||||||
|
|
||||||
/** The maximum number of replicas allowed for a block */
|
/** The maximum number of replicas allowed for a block */
|
||||||
public final short maxReplication;
|
public final short maxReplication;
|
||||||
/**
|
/**
|
||||||
|
@ -549,6 +554,12 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
|
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
|
||||||
|
|
||||||
|
long heartbeatIntervalSecs = conf.getTimeDuration(
|
||||||
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
|
||||||
|
long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
|
||||||
|
pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
|
||||||
|
|
||||||
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
|
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
|
||||||
|
|
||||||
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
||||||
|
@ -4736,6 +4747,25 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notification of a successful block recovery.
|
||||||
|
* @param block for which the recovery succeeded
|
||||||
|
*/
|
||||||
|
public void successfulBlockRecovery(BlockInfo block) {
|
||||||
|
pendingRecoveryBlocks.remove(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether a recovery attempt has been made for the given block.
|
||||||
|
* If so, checks whether that attempt has timed out.
|
||||||
|
* @param b block for which recovery is being attempted
|
||||||
|
* @return true if no recovery attempt has been made or
|
||||||
|
* the previous attempt timed out
|
||||||
|
*/
|
||||||
|
public boolean addBlockRecoveryAttempt(BlockInfo b) {
|
||||||
|
return pendingRecoveryBlocks.add(b);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void flushBlockOps() throws IOException {
|
public void flushBlockOps() throws IOException {
|
||||||
runBlockOp(new Callable<Void>(){
|
runBlockOp(new Callable<Void>(){
|
||||||
|
@ -4863,4 +4893,14 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
|
||||||
|
return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
|
||||||
|
BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
|
||||||
|
pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,143 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* PendingRecoveryBlocks tracks recovery attempts for each block and their
|
||||||
|
* timeouts to ensure we do not have multiple recoveries at the same time
|
||||||
|
* and retry only after the timeout for a recovery has expired.
|
||||||
|
*/
|
||||||
|
class PendingRecoveryBlocks {
|
||||||
|
private static final Logger LOG = BlockManager.LOG;
|
||||||
|
|
||||||
|
/** List of recovery attempts per block and the time they expire. */
|
||||||
|
private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
|
||||||
|
new LightWeightHashSet<>();
|
||||||
|
|
||||||
|
/** The timeout for issuing a block recovery again.
|
||||||
|
* (it should be larger than the time to recover a block)
|
||||||
|
*/
|
||||||
|
private long recoveryTimeoutInterval;
|
||||||
|
|
||||||
|
PendingRecoveryBlocks(long timeout) {
|
||||||
|
this.recoveryTimeoutInterval = timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove recovery attempt for the given block.
|
||||||
|
* @param block whose recovery attempt to remove.
|
||||||
|
*/
|
||||||
|
synchronized void remove(BlockInfo block) {
|
||||||
|
recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether a recovery attempt has been made for the given block.
|
||||||
|
* If so, checks whether that attempt has timed out.
|
||||||
|
* @param block block for which recovery is being attempted
|
||||||
|
* @return true if no recovery attempt has been made or
|
||||||
|
* the previous attempt timed out
|
||||||
|
*/
|
||||||
|
synchronized boolean add(BlockInfo block) {
|
||||||
|
boolean added = false;
|
||||||
|
long curTime = getTime();
|
||||||
|
BlockRecoveryAttempt recoveryAttempt =
|
||||||
|
recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
|
||||||
|
|
||||||
|
if (recoveryAttempt == null) {
|
||||||
|
BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
|
||||||
|
block, curTime + recoveryTimeoutInterval);
|
||||||
|
added = recoveryTimeouts.add(newAttempt);
|
||||||
|
} else if (recoveryAttempt.hasTimedOut(curTime)) {
|
||||||
|
// Previous attempt timed out, reset the timeout
|
||||||
|
recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
|
||||||
|
added = true;
|
||||||
|
} else {
|
||||||
|
long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
|
||||||
|
recoveryAttempt.timeoutAt - curTime);
|
||||||
|
LOG.info("Block recovery attempt for " + block + " rejected, as the " +
|
||||||
|
"previous attempt times out in " + timeoutIn + " seconds.");
|
||||||
|
}
|
||||||
|
return added;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether the given block is under recovery.
|
||||||
|
* @param b block for which to check
|
||||||
|
* @return true if the given block is being recovered
|
||||||
|
*/
|
||||||
|
synchronized boolean isUnderRecovery(BlockInfo b) {
|
||||||
|
BlockRecoveryAttempt recoveryAttempt =
|
||||||
|
recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
|
||||||
|
return recoveryAttempt != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getTime() {
|
||||||
|
return Time.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
|
||||||
|
this.recoveryTimeoutInterval = recoveryTimeoutInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks timeout for block recovery attempt of a given block.
|
||||||
|
*/
|
||||||
|
private static class BlockRecoveryAttempt {
|
||||||
|
private final BlockInfo blockInfo;
|
||||||
|
private long timeoutAt;
|
||||||
|
|
||||||
|
private BlockRecoveryAttempt(BlockInfo blockInfo) {
|
||||||
|
this(blockInfo, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
|
||||||
|
this.blockInfo = blockInfo;
|
||||||
|
this.timeoutAt = timeoutAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean hasTimedOut(long currentTime) {
|
||||||
|
return currentTime > timeoutAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setTimeout(long newTimeoutAt) {
|
||||||
|
this.timeoutAt = newTimeoutAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return blockInfo.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof BlockRecoveryAttempt) {
|
||||||
|
return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -3299,25 +3299,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
+ "Removed empty last block and closed file " + src);
|
+ "Removed empty last block and closed file " + src);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// start recovery of the last block for this file
|
// Start recovery of the last block for this file
|
||||||
long blockRecoveryId = nextGenerationStamp(
|
// Only do so if there is no ongoing recovery for this block,
|
||||||
blockManager.isLegacyBlock(lastBlock));
|
// or the previous recovery for this block timed out.
|
||||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
|
||||||
if(copyOnTruncate) {
|
long blockRecoveryId = nextGenerationStamp(
|
||||||
lastBlock.setGenerationStamp(blockRecoveryId);
|
blockManager.isLegacyBlock(lastBlock));
|
||||||
} else if(truncateRecovery) {
|
if(copyOnTruncate) {
|
||||||
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
lastBlock.setGenerationStamp(blockRecoveryId);
|
||||||
}
|
} else if(truncateRecovery) {
|
||||||
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
|
recoveryBlock.setGenerationStamp(blockRecoveryId);
|
||||||
leaseManager.renewLease(lease);
|
}
|
||||||
// Cannot close file right now, since the last block requires recovery.
|
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
|
||||||
// This may potentially cause infinite loop in lease recovery
|
|
||||||
// if there are no valid replicas on data-nodes.
|
// Cannot close file right now, since the last block requires recovery.
|
||||||
NameNode.stateChangeLog.warn(
|
// This may potentially cause infinite loop in lease recovery
|
||||||
"DIR* NameSystem.internalReleaseLease: " +
|
// if there are no valid replicas on data-nodes.
|
||||||
|
NameNode.stateChangeLog.warn(
|
||||||
|
"DIR* NameSystem.internalReleaseLease: " +
|
||||||
"File " + src + " has not been closed." +
|
"File " + src + " has not been closed." +
|
||||||
" Lease recovery is in progress. " +
|
" Lease recovery is in progress. " +
|
||||||
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
|
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
|
||||||
|
}
|
||||||
|
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||||
|
leaseManager.renewLease(lease);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -3585,6 +3590,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// If this commit does not want to close the file, persist blocks
|
// If this commit does not want to close the file, persist blocks
|
||||||
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
|
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
|
||||||
}
|
}
|
||||||
|
blockManager.successfulBlockRecovery(storedBlock);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock("commitBlockSynchronization");
|
writeUnlock("commitBlockSynchronization");
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class contains unit tests for PendingRecoveryBlocks.java functionality.
|
||||||
|
*/
|
||||||
|
public class TestPendingRecoveryBlocks {
|
||||||
|
|
||||||
|
private PendingRecoveryBlocks pendingRecoveryBlocks;
|
||||||
|
private final long recoveryTimeout = 1000L;
|
||||||
|
|
||||||
|
private final BlockInfo blk1 = getBlock(1);
|
||||||
|
private final BlockInfo blk2 = getBlock(2);
|
||||||
|
private final BlockInfo blk3 = getBlock(3);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
pendingRecoveryBlocks =
|
||||||
|
Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockInfo getBlock(long blockId) {
|
||||||
|
return new BlockInfoContiguous(new Block(blockId), (short) 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddDifferentBlocks() {
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk1));
|
||||||
|
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk2));
|
||||||
|
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk3));
|
||||||
|
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddAndRemoveBlocks() {
|
||||||
|
// Add blocks
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk1));
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk2));
|
||||||
|
|
||||||
|
// Remove blk1
|
||||||
|
pendingRecoveryBlocks.remove(blk1);
|
||||||
|
|
||||||
|
// Adding back blk1 should succeed
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddBlockWithPreviousRecoveryTimedOut() {
|
||||||
|
// Add blk
|
||||||
|
Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk1));
|
||||||
|
|
||||||
|
// Should fail, has not timed out yet
|
||||||
|
Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
|
||||||
|
assertFalse(pendingRecoveryBlocks.add(blk1));
|
||||||
|
|
||||||
|
// Should succeed after timing out
|
||||||
|
Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
|
||||||
|
assertTrue(pendingRecoveryBlocks.add(blk1));
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,7 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
@ -43,6 +46,7 @@ import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -94,6 +98,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -1035,4 +1040,107 @@ public class TestBlockRecovery {
|
||||||
Assert.fail("Thread failure: " + failureReason);
|
Assert.fail("Thread failure: " + failureReason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for block recovery taking longer than the heartbeat interval.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000L)
|
||||||
|
public void testRecoverySlowerThanHeartbeat() throws Exception {
|
||||||
|
tearDown(); // Stop the Mocked DN started in startup()
|
||||||
|
|
||||||
|
SleepAnswer delayer = new SleepAnswer(3000, 6000);
|
||||||
|
testRecoveryWithDatanodeDelayed(delayer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for block recovery timeout. All recovery attempts will be delayed
|
||||||
|
* and the first attempt will be lost to trigger recovery timeout and retry.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000L)
|
||||||
|
public void testRecoveryTimeout() throws Exception {
|
||||||
|
tearDown(); // Stop the Mocked DN started in startup()
|
||||||
|
final Random r = new Random();
|
||||||
|
|
||||||
|
// Make sure first commitBlockSynchronization call from the DN gets lost
|
||||||
|
// for the recovery timeout to expire and new recovery attempt
|
||||||
|
// to be started.
|
||||||
|
SleepAnswer delayer = new SleepAnswer(3000) {
|
||||||
|
private final AtomicBoolean callRealMethod = new AtomicBoolean();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
boolean interrupted = false;
|
||||||
|
try {
|
||||||
|
Thread.sleep(r.nextInt(3000) + 6000);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
interrupted = true;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (callRealMethod.get()) {
|
||||||
|
return invocation.callRealMethod();
|
||||||
|
}
|
||||||
|
callRealMethod.set(true);
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
if (interrupted) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
testRecoveryWithDatanodeDelayed(delayer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testRecoveryWithDatanodeDelayed(
|
||||||
|
GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
|
||||||
|
Configuration configuration = new HdfsConfiguration();
|
||||||
|
configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(configuration)
|
||||||
|
.numDataNodes(2).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
final FSNamesystem ns = cluster.getNamesystem();
|
||||||
|
final NameNode nn = cluster.getNameNode();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
ns.getBlockManager().setBlockRecoveryTimeout(
|
||||||
|
TimeUnit.SECONDS.toMillis(10));
|
||||||
|
|
||||||
|
// Create a file and never close the output stream to trigger recovery
|
||||||
|
FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
|
||||||
|
(short) 2);
|
||||||
|
out.write(AppendTestUtil.randomBytes(0, 4096));
|
||||||
|
out.hsync();
|
||||||
|
|
||||||
|
List<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
|
for (DataNode datanode : dataNodes) {
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||||
|
InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
|
||||||
|
|
||||||
|
Mockito.doAnswer(recoveryDelayer).when(nnSpy).
|
||||||
|
commitBlockSynchronization(
|
||||||
|
Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
|
||||||
|
Mockito.anyLong(), Mockito.anyBoolean(),
|
||||||
|
Mockito.anyBoolean(), Mockito.anyObject(),
|
||||||
|
Mockito.anyObject());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure hard lease expires to trigger replica recovery
|
||||||
|
cluster.setLeasePeriod(100L, 100L);
|
||||||
|
|
||||||
|
// Wait for recovery to succeed
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return ns.getCompleteBlocksTotal() > 0;
|
||||||
|
}
|
||||||
|
}, 300, 300000);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -278,12 +279,14 @@ public class TestPipelinesFailover {
|
||||||
// Disable permissions so that another user can recover the lease.
|
// Disable permissions so that another user can recover the lease.
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
|
||||||
FSDataOutputStream stm = null;
|
FSDataOutputStream stm = null;
|
||||||
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
|
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
|
cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout(
|
||||||
|
TimeUnit.SECONDS.toMillis(1));
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
|
|
||||||
LOG.info("Starting with NN 0 active");
|
LOG.info("Starting with NN 0 active");
|
||||||
|
|
Loading…
Reference in New Issue