From 53bbef3802194b7a0a3ce5cd3c91def9e88856e3 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Fri, 1 Dec 2017 11:19:01 -0800 Subject: [PATCH] Revert "HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak" This reverts commit 5304698dc8c5667c33e6ed9c4a827ef57172a723. --- .../apache/hadoop/test/GenericTestUtils.java | 10 +- .../server/blockmanagement/BlockManager.java | 40 ----- .../PendingRecoveryBlocks.java | 143 ------------------ .../hdfs/server/namenode/FSNamesystem.java | 40 +++-- .../TestPendingRecoveryBlocks.java | 87 ----------- .../server/datanode/TestBlockRecovery.java | 108 ------------- .../namenode/ha/TestPipelinesFailover.java | 5 +- 7 files changed, 20 insertions(+), 413 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index cdde48ce575..0db6c73e279 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -641,16 +641,10 @@ public abstract class GenericTestUtils { * conditions. */ public static class SleepAnswer implements Answer { - private final int minSleepTime; private final int maxSleepTime; private static Random r = new Random(); - + public SleepAnswer(int maxSleepTime) { - this(0, maxSleepTime); - } - - public SleepAnswer(int minSleepTime, int maxSleepTime) { - this.minSleepTime = minSleepTime; this.maxSleepTime = maxSleepTime; } @@ -658,7 +652,7 @@ public abstract class GenericTestUtils { public Object answer(InvocationOnMock invocation) throws Throwable { boolean interrupted = false; try { - Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime); + Thread.sleep(r.nextInt(maxSleepTime)); } catch (InterruptedException ie) { interrupted = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 1cdb159d981..4986027b04a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -164,8 +164,6 @@ public class BlockManager implements BlockStatsMXBean { private static final String QUEUE_REASON_FUTURE_GENSTAMP = "generation stamp is in the future"; - private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30; - private final Namesystem namesystem; private final BlockManagerSafeMode bmSafeMode; @@ -355,9 +353,6 @@ public class BlockManager implements BlockStatsMXBean { @VisibleForTesting final PendingReconstructionBlocks pendingReconstruction; - /** Stores information about block recovery attempts. */ - private final PendingRecoveryBlocks pendingRecoveryBlocks; - /** The maximum number of replicas allowed for a block */ public final short maxReplication; /** @@ -554,12 +549,6 @@ public class BlockManager implements BlockStatsMXBean { } this.minReplicationToBeInMaintenance = (short)minMaintenanceR; - long heartbeatIntervalSecs = conf.getTimeDuration( - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); - long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs); - pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout); - this.blockReportLeaseManager = new BlockReportLeaseManager(conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf); @@ -4747,25 +4736,6 @@ public class BlockManager implements BlockStatsMXBean { } } - /** - * Notification of a successful block recovery. - * @param block for which the recovery succeeded - */ - public void successfulBlockRecovery(BlockInfo block) { - pendingRecoveryBlocks.remove(block); - } - - /** - * Checks whether a recovery attempt has been made for the given block. - * If so, checks whether that attempt has timed out. - * @param b block for which recovery is being attempted - * @return true if no recovery attempt has been made or - * the previous attempt timed out - */ - public boolean addBlockRecoveryAttempt(BlockInfo b) { - return pendingRecoveryBlocks.add(b); - } - @VisibleForTesting public void flushBlockOps() throws IOException { runBlockOp(new Callable(){ @@ -4893,14 +4863,4 @@ public class BlockManager implements BlockStatsMXBean { } return i; } - - private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) { - return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs * - BLOCK_RECOVERY_TIMEOUT_MULTIPLIER); - } - - @VisibleForTesting - public void setBlockRecoveryTimeout(long blockRecoveryTimeout) { - pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java deleted file mode 100644 index 3f5f27c8190..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdfs.util.LightWeightHashSet; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; - -import java.util.concurrent.TimeUnit; - -/** - * PendingRecoveryBlocks tracks recovery attempts for each block and their - * timeouts to ensure we do not have multiple recoveries at the same time - * and retry only after the timeout for a recovery has expired. - */ -class PendingRecoveryBlocks { - private static final Logger LOG = BlockManager.LOG; - - /** List of recovery attempts per block and the time they expire. */ - private final LightWeightHashSet recoveryTimeouts = - new LightWeightHashSet<>(); - - /** The timeout for issuing a block recovery again. - * (it should be larger than the time to recover a block) - */ - private long recoveryTimeoutInterval; - - PendingRecoveryBlocks(long timeout) { - this.recoveryTimeoutInterval = timeout; - } - - /** - * Remove recovery attempt for the given block. - * @param block whose recovery attempt to remove. - */ - synchronized void remove(BlockInfo block) { - recoveryTimeouts.remove(new BlockRecoveryAttempt(block)); - } - - /** - * Checks whether a recovery attempt has been made for the given block. - * If so, checks whether that attempt has timed out. - * @param block block for which recovery is being attempted - * @return true if no recovery attempt has been made or - * the previous attempt timed out - */ - synchronized boolean add(BlockInfo block) { - boolean added = false; - long curTime = getTime(); - BlockRecoveryAttempt recoveryAttempt = - recoveryTimeouts.getElement(new BlockRecoveryAttempt(block)); - - if (recoveryAttempt == null) { - BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt( - block, curTime + recoveryTimeoutInterval); - added = recoveryTimeouts.add(newAttempt); - } else if (recoveryAttempt.hasTimedOut(curTime)) { - // Previous attempt timed out, reset the timeout - recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval); - added = true; - } else { - long timeoutIn = TimeUnit.MILLISECONDS.toSeconds( - recoveryAttempt.timeoutAt - curTime); - LOG.info("Block recovery attempt for " + block + " rejected, as the " + - "previous attempt times out in " + timeoutIn + " seconds."); - } - return added; - } - - /** - * Check whether the given block is under recovery. - * @param b block for which to check - * @return true if the given block is being recovered - */ - synchronized boolean isUnderRecovery(BlockInfo b) { - BlockRecoveryAttempt recoveryAttempt = - recoveryTimeouts.getElement(new BlockRecoveryAttempt(b)); - return recoveryAttempt != null; - } - - long getTime() { - return Time.monotonicNow(); - } - - @VisibleForTesting - synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) { - this.recoveryTimeoutInterval = recoveryTimeoutInterval; - } - - /** - * Tracks timeout for block recovery attempt of a given block. - */ - private static class BlockRecoveryAttempt { - private final BlockInfo blockInfo; - private long timeoutAt; - - private BlockRecoveryAttempt(BlockInfo blockInfo) { - this(blockInfo, 0); - } - - BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) { - this.blockInfo = blockInfo; - this.timeoutAt = timeoutAt; - } - - boolean hasTimedOut(long currentTime) { - return currentTime > timeoutAt; - } - - void setTimeout(long newTimeoutAt) { - this.timeoutAt = newTimeoutAt; - } - - @Override - public int hashCode() { - return blockInfo.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof BlockRecoveryAttempt) { - return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo); - } - return false; - } - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 6a890e2907a..d3d9cdc42da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3318,30 +3318,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, + "Removed empty last block and closed file " + src); return true; } - // Start recovery of the last block for this file - // Only do so if there is no ongoing recovery for this block, - // or the previous recovery for this block timed out. - if (blockManager.addBlockRecoveryAttempt(lastBlock)) { - long blockRecoveryId = nextGenerationStamp( - blockManager.isLegacyBlock(lastBlock)); - if(copyOnTruncate) { - lastBlock.setGenerationStamp(blockRecoveryId); - } else if(truncateRecovery) { - recoveryBlock.setGenerationStamp(blockRecoveryId); - } - uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true); - - // Cannot close file right now, since the last block requires recovery. - // This may potentially cause infinite loop in lease recovery - // if there are no valid replicas on data-nodes. - NameNode.stateChangeLog.warn( - "DIR* NameSystem.internalReleaseLease: " + - "File " + src + " has not been closed." + - " Lease recovery is in progress. " + - "RecoveryId = " + blockRecoveryId + " for block " + lastBlock); - } + // start recovery of the last block for this file + long blockRecoveryId = nextGenerationStamp( + blockManager.isLegacyBlock(lastBlock)); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); + if(copyOnTruncate) { + lastBlock.setGenerationStamp(blockRecoveryId); + } else if(truncateRecovery) { + recoveryBlock.setGenerationStamp(blockRecoveryId); + } + uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true); leaseManager.renewLease(lease); + // Cannot close file right now, since the last block requires recovery. + // This may potentially cause infinite loop in lease recovery + // if there are no valid replicas on data-nodes. + NameNode.stateChangeLog.warn( + "DIR* NameSystem.internalReleaseLease: " + + "File " + src + " has not been closed." + + " Lease recovery is in progress. " + + "RecoveryId = " + blockRecoveryId + " for block " + lastBlock); break; } return false; @@ -3609,7 +3604,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // If this commit does not want to close the file, persist blocks FSDirWriteFileOp.persistBlocks(dir, src, iFile, false); } - blockManager.successfulBlockRecovery(storedBlock); } finally { writeUnlock("commitBlockSynchronization"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java deleted file mode 100644 index baad89f5492..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * This class contains unit tests for PendingRecoveryBlocks.java functionality. - */ -public class TestPendingRecoveryBlocks { - - private PendingRecoveryBlocks pendingRecoveryBlocks; - private final long recoveryTimeout = 1000L; - - private final BlockInfo blk1 = getBlock(1); - private final BlockInfo blk2 = getBlock(2); - private final BlockInfo blk3 = getBlock(3); - - @Before - public void setUp() { - pendingRecoveryBlocks = - Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout)); - } - - BlockInfo getBlock(long blockId) { - return new BlockInfoContiguous(new Block(blockId), (short) 0); - } - - @Test - public void testAddDifferentBlocks() { - assertTrue(pendingRecoveryBlocks.add(blk1)); - assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1)); - assertTrue(pendingRecoveryBlocks.add(blk2)); - assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2)); - assertTrue(pendingRecoveryBlocks.add(blk3)); - assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3)); - } - - @Test - public void testAddAndRemoveBlocks() { - // Add blocks - assertTrue(pendingRecoveryBlocks.add(blk1)); - assertTrue(pendingRecoveryBlocks.add(blk2)); - - // Remove blk1 - pendingRecoveryBlocks.remove(blk1); - - // Adding back blk1 should succeed - assertTrue(pendingRecoveryBlocks.add(blk1)); - } - - @Test - public void testAddBlockWithPreviousRecoveryTimedOut() { - // Add blk - Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime(); - assertTrue(pendingRecoveryBlocks.add(blk1)); - - // Should fail, has not timed out yet - Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime(); - assertFalse(pendingRecoveryBlocks.add(blk1)); - - // Should succeed after timing out - Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime(); - assertTrue(pendingRecoveryBlocks.add(blk1)); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 208447dfba6..311d5a67c39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; -import org.apache.hadoop.hdfs.AppendTestUtil; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; - import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -46,7 +43,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -98,7 +94,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.SleepAnswer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; @@ -1040,107 +1035,4 @@ public class TestBlockRecovery { Assert.fail("Thread failure: " + failureReason); } } - - /** - * Test for block recovery taking longer than the heartbeat interval. - */ - @Test(timeout = 300000L) - public void testRecoverySlowerThanHeartbeat() throws Exception { - tearDown(); // Stop the Mocked DN started in startup() - - SleepAnswer delayer = new SleepAnswer(3000, 6000); - testRecoveryWithDatanodeDelayed(delayer); - } - - /** - * Test for block recovery timeout. All recovery attempts will be delayed - * and the first attempt will be lost to trigger recovery timeout and retry. - */ - @Test(timeout = 300000L) - public void testRecoveryTimeout() throws Exception { - tearDown(); // Stop the Mocked DN started in startup() - final Random r = new Random(); - - // Make sure first commitBlockSynchronization call from the DN gets lost - // for the recovery timeout to expire and new recovery attempt - // to be started. - SleepAnswer delayer = new SleepAnswer(3000) { - private final AtomicBoolean callRealMethod = new AtomicBoolean(); - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - boolean interrupted = false; - try { - Thread.sleep(r.nextInt(3000) + 6000); - } catch (InterruptedException ie) { - interrupted = true; - } - try { - if (callRealMethod.get()) { - return invocation.callRealMethod(); - } - callRealMethod.set(true); - return null; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - }; - testRecoveryWithDatanodeDelayed(delayer); - } - - private void testRecoveryWithDatanodeDelayed( - GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception { - Configuration configuration = new HdfsConfiguration(); - configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - MiniDFSCluster cluster = null; - - try { - cluster = new MiniDFSCluster.Builder(configuration) - .numDataNodes(2).build(); - cluster.waitActive(); - final FSNamesystem ns = cluster.getNamesystem(); - final NameNode nn = cluster.getNameNode(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - ns.getBlockManager().setBlockRecoveryTimeout( - TimeUnit.SECONDS.toMillis(10)); - - // Create a file and never close the output stream to trigger recovery - FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"), - (short) 2); - out.write(AppendTestUtil.randomBytes(0, 4096)); - out.hsync(); - - List dataNodes = cluster.getDataNodes(); - for (DataNode datanode : dataNodes) { - DatanodeProtocolClientSideTranslatorPB nnSpy = - InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn); - - Mockito.doAnswer(recoveryDelayer).when(nnSpy). - commitBlockSynchronization( - Mockito.any(ExtendedBlock.class), Mockito.anyInt(), - Mockito.anyLong(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.anyObject(), - Mockito.anyObject()); - } - - // Make sure hard lease expires to trigger replica recovery - cluster.setLeasePeriod(100L, 100L); - - // Wait for recovery to succeed - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - return ns.getCompleteBlocksTotal() > 0; - } - }, 300, 300000); - - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index a5655787a91..dc7f47a79cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Random; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -279,14 +278,12 @@ public class TestPipelinesFailover { // Disable permissions so that another user can recover the lease. conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - + FSDataOutputStream stm = null; final MiniDFSCluster cluster = newMiniCluster(conf, 3); try { cluster.waitActive(); cluster.transitionToActive(0); - cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout( - TimeUnit.SECONDS.toMillis(1)); Thread.sleep(500); LOG.info("Starting with NN 0 active");