HDFS-15940. Fixing and refactoring tests specific to Block recovery. (#2902)

* HDFS-15940. Fixing and refactoring tests specific to Block recovery.(#2844). Contributed by Viraj Jasani
(cherry picked from commit 2b5fd341b9)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

* HDFS-15940. Fix TestBlockRecovery2#testRaceBetweenReplicaRecoveryAndFinalizeBlock (ADDENDUM) (#2874)

(cherry picked from commit 56bd968fb4)
(cherry picked from commit 4994b73eeb)

Co-authored-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Wei-Chiu Chuang 2021-04-14 20:20:23 -07:00 committed by GitHub
parent d9858a4663
commit f0331827e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 542 additions and 342 deletions

View File

@ -19,22 +19,15 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
@ -46,16 +39,13 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -82,13 +72,11 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -103,13 +91,11 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.event.Level; import org.slf4j.event.Level;
@ -304,13 +290,13 @@ public class TestBlockRecovery {
} }
} }
/** Sync two replicas */ /**
private void testSyncReplicas(ReplicaRecoveryInfo replica1, * Sync two replicas.
ReplicaRecoveryInfo replica2, */
InterDatanodeProtocol dn1, private void testSyncReplicas(ReplicaRecoveryInfo replica1,
InterDatanodeProtocol dn2, ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1,
long expectLen) throws IOException { InterDatanodeProtocol dn2) throws IOException {
DatanodeInfo[] locs = new DatanodeInfo[]{ DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)}; mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
@ -321,8 +307,8 @@ public class TestBlockRecovery {
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2); DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2);
syncList.add(record1); syncList.add(record1);
syncList.add(record2); syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), when(dn1.updateReplicaUnderRecovery(any(ExtendedBlock.class), anyLong(),
anyLong(), anyLong())).thenReturn("storage1"); anyLong(), anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong(), anyLong())).thenReturn("storage2"); anyLong(), anyLong())).thenReturn("storage2");
@ -331,7 +317,7 @@ public class TestBlockRecovery {
recoveryWorker.new RecoveryTaskContiguous(rBlock); recoveryWorker.new RecoveryTaskContiguous(rBlock);
RecoveryTaskContiguous.syncBlock(syncList); RecoveryTaskContiguous.syncBlock(syncList);
} }
/** /**
* BlockRecovery_02.8. * BlockRecovery_02.8.
* Two replicas are in Finalized state * Two replicas are in Finalized state
@ -342,38 +328,38 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP - 2, ReplicaState.FINALIZED);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
// two finalized replicas have different length // two finalized replicas have different length
replica1 = new ReplicaRecoveryInfo(BLOCK_ID, replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED);
replica2 = new ReplicaRecoveryInfo(BLOCK_ID, replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED); REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.FINALIZED);
try { try {
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
Assert.fail("Two finalized replicas should not have different lengthes!"); Assert.fail("Two finalized replicas should not have different lengthes!");
} catch (IOException e) { } catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith( Assert.assertTrue(e.getMessage().startsWith(
"Inconsistent size of finalized replicas. ")); "Inconsistent size of finalized replicas. "));
} }
} }
/** /**
* BlockRecovery_02.9. * BlockRecovery_02.9.
* One replica is Finalized and another is RBW. * One replica is Finalized and another is RBW.
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test(timeout=60000) @Test(timeout=60000)
@ -381,80 +367,81 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
// rbw and finalized replicas have the same length // rbw and finalized replicas have the same length
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW); REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
// rbw replica has a different length from the finalized one // rbw replica has a different length from the finalized one
replica1 = new ReplicaRecoveryInfo(BLOCK_ID, replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED);
replica2 = new ReplicaRecoveryInfo(BLOCK_ID, replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RBW);
dn1 = mock(InterDatanodeProtocol.class); dn1 = mock(InterDatanodeProtocol.class);
dn2 = mock(InterDatanodeProtocol.class); dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery( verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
} }
/** /**
* BlockRecovery_02.10. * BlockRecovery_02.10.
* One replica is Finalized and another is RWR. * One replica is Finalized and another is RWR.
*
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test(timeout=60000) @Test(timeout = 60000)
public void testFinalizedRwrReplicas() throws IOException { public void testFinalizedRwrReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
// rbw and finalized replicas have the same length // rbw and finalized replicas have the same length
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); REPLICA_LEN1, GEN_STAMP - 2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery( verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
// rbw replica has a different length from the finalized one // rbw replica has a different length from the finalized one
replica1 = new ReplicaRecoveryInfo(BLOCK_ID, replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED);
replica2 = new ReplicaRecoveryInfo(BLOCK_ID, replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RBW);
dn1 = mock(InterDatanodeProtocol.class); dn1 = mock(InterDatanodeProtocol.class);
dn2 = mock(InterDatanodeProtocol.class); dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1); REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery( verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
} }
/** /**
* BlockRecovery_02.11. * BlockRecovery_02.11.
* Two replicas are RBW. * Two replicas are RBW.
@ -462,26 +449,27 @@ public class TestBlockRecovery {
*/ */
@Test(timeout=60000) @Test(timeout=60000)
public void testRBWReplicas() throws IOException { public void testRBWReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.RBW);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RBW);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
} }
/** /**
* BlockRecovery_02.12. * BlockRecovery_02.12.
* One replica is RBW and another is RWR. * One replica is RBW and another is RWR.
*
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test(timeout=60000) @Test(timeout=60000)
@ -489,44 +477,45 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.RBW);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); REPLICA_LEN1, GEN_STAMP - 2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery( verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
} }
/** /**
* BlockRecovery_02.13. * BlockRecovery_02.13.
* Two replicas are RWR. * Two replicas are RWR.
*
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test(timeout=60000) @Test(timeout=60000)
public void testRWRReplicas() throws IOException { public void testRWRReplicas() throws IOException {
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR); REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.RWR);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR); REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen); testSyncReplicas(replica1, replica2, dn1, dn2);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
} }
private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException { private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1); Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
@ -714,132 +703,6 @@ public class TestBlockRecovery {
} }
} }
@Test(timeout = 60000)
public void testEcRecoverBlocks() throws Throwable {
// Stop the Mocked DN started in startup()
tearDown();
ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(8)
.build();
try {
cluster.waitActive();
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(
LOG);
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(),
anyLong());
String topDir = "/myDir";
DFSClient client = new DFSClient(null, spyNN, conf, null);
Path file = new Path(topDir + "/testECLeaseRecover");
client.mkdirs(topDir, null, false);
client.enableErasureCodingPolicy(ecPolicy.getName());
client.setErasureCodingPolicy(topDir, ecPolicy.getName());
OutputStream stm = client.create(file.toString(), true);
// write 5MB File
AppendTestUtil.write(stm, 0, 1024 * 1024 * 5);
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread t = new Thread() {
@Override
public void run() {
try {
stm.close();
} catch (Throwable t) {
err.set(t);
}
}
};
t.start();
// Waiting for close to get to latch
delayer.waitForCall();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return client.getNamenode().recoverLease(file.toString(),
client.getClientName());
} catch (IOException e) {
return false;
}
}
}, 5000, 24000);
delayer.proceed();
} finally {
cluster.shutdown();
}
}
/**
* Test to verify the race between finalizeBlock and Lease recovery
*
* @throws Exception
*/
@Test(timeout = 20000)
public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
tearDown();// Stop the Mocked DN started in startup()
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread() {
@Override
public void run() {
try {
DatanodeInfo[] locations = block.getLocations();
final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1);
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
} catch (Exception e) {
recoveryInitResult.set(false);
}
}
};
recoveryThread.start();
try {
out.close();
} catch (IOException e) {
Assert.assertTrue("Writing should fail",
e.getMessage().contains("are bad. Aborting..."));
} finally {
recoveryThread.join();
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();
cluster = null;
}
}
}
/** /**
* DNs report RUR instead of RBW, RWR or FINALIZED. Primary DN expected to * DNs report RUR instead of RBW, RWR or FINALIZED. Primary DN expected to
* throw an exception. * throw an exception.
@ -1113,57 +976,7 @@ public class TestBlockRecovery {
} }
} }
/** static void testRecoveryWithDatanodeDelayed(
* Test for block recovery taking longer than the heartbeat interval.
*/
@Test(timeout = 300000L)
public void testRecoverySlowerThanHeartbeat() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
SleepAnswer delayer = new SleepAnswer(3000, 6000);
testRecoveryWithDatanodeDelayed(delayer);
}
/**
* Test for block recovery timeout. All recovery attempts will be delayed
* and the first attempt will be lost to trigger recovery timeout and retry.
*/
@Test(timeout = 300000L)
public void testRecoveryTimeout() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final Random r = new Random();
// Make sure first commitBlockSynchronization call from the DN gets lost
// for the recovery timeout to expire and new recovery attempt
// to be started.
SleepAnswer delayer = new SleepAnswer(3000) {
private final AtomicBoolean callRealMethod = new AtomicBoolean();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(3000) + 6000);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
if (callRealMethod.get()) {
return invocation.callRealMethod();
}
callRealMethod.set(true);
return null;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
};
testRecoveryWithDatanodeDelayed(delayer);
}
private void testRecoveryWithDatanodeDelayed(
GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception { GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
Configuration configuration = new HdfsConfiguration(); Configuration configuration = new HdfsConfiguration();
configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
@ -1215,80 +1028,4 @@ public class TestBlockRecovery {
} }
} }
/**
* Test that block will be recovered even if there are less than the
* specified minReplication datanodes involved in its recovery.
*
* Check that, after recovering, the block will be successfully replicated.
*/
@Test(timeout = 300000L)
public void testRecoveryWillIgnoreMinReplication() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final int blockSize = 4096;
final int numReplicas = 3;
final String filename = "/testIgnoreMinReplication";
final Path filePath = new Path(filename);
Configuration configuration = new HdfsConfiguration();
configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
.build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FSNamesystem fsn = cluster.getNamesystem();
// Create a file and never close the output stream to trigger recovery
FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
out.write(AppendTestUtil.randomBytes(0, blockSize));
out.hsync();
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), configuration);
LocatedBlock blk = dfsClient.getNamenode().
getBlockLocations(filename, 0, blockSize).
getLastLocatedBlock();
// Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
assertEquals(dataNodes.size(), numReplicas);
for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
cluster.stopDataNode(dataNode.getName());
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return fsn.getNumDeadDataNodes() == 2;
}
}, 300, 300000);
// Make sure hard lease expires to trigger replica recovery
cluster.setLeasePeriod(100L, 100L);
// Wait for recovery to succeed
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return dfs.isFileClosed(filePath);
} catch (IOException e) {}
return false;
}
}, 300, 300000);
// Wait for the block to be replicated
DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
dfs, filePath), 1, numReplicas, 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }

View File

@ -0,0 +1,464 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* Test part 2 for sync all replicas in block recovery.
*/
public class TestBlockRecovery2 {
private static final Logger LOG =
LoggerFactory.getLogger(TestBlockRecovery2.class);
private static final String DATA_DIR =
MiniDFSCluster.getBaseDirectory() + "data";
private DataNode dn;
private Configuration conf;
private boolean tearDownDone;
private final static String CLUSTER_ID = "testClusterID";
private final static String POOL_ID = "BP-TEST";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020);
@Rule
public TestName currentTestName = new TestName();
static {
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(LOG, Level.TRACE);
}
/**
* Starts an instance of DataNode.
* @throws IOException
*/
@Before
public void startUp() throws IOException {
tearDownDone = false;
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf,
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
List<StorageLocation> locations = new ArrayList<>();
File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir);
dataDir.mkdirs();
StorageLocation location = StorageLocation.parse(dataDir.getPath());
locations.add(location);
final DatanodeProtocolClientSideTranslatorPB namenode =
mock(DatanodeProtocolClientSideTranslatorPB.class);
Mockito.doAnswer(
(Answer<DatanodeRegistration>) invocation ->
(DatanodeRegistration) invocation.getArguments()[0])
.when(namenode)
.registerDatanode(any(DatanodeRegistration.class));
when(namenode.versionRequest())
.thenReturn(new NamespaceInfo(1, CLUSTER_ID, POOL_ID, 1L));
when(namenode.sendHeartbeat(
any(),
any(),
anyLong(),
anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
any(),
Mockito.anyBoolean(),
any(),
any()))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1),
null, ThreadLocalRandom.current().nextLong() | 1L));
dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
Assert.assertEquals(NN_ADDR, nnAddr);
return namenode;
}
};
// Trigger a heartbeat so that it acknowledges the NN as active.
dn.getAllBpOs().get(0).triggerHeartbeatForTests();
waitForActiveNN();
}
/**
* Wait for active NN up to 15 seconds.
*/
private void waitForActiveNN() {
try {
GenericTestUtils.waitFor(() ->
dn.getAllBpOs().get(0).getActiveNN() != null, 1000, 15 * 1000);
} catch (TimeoutException e) {
// Here its not failing, will again do the assertions for activeNN after
// this waiting period and fails there if BPOS has not acknowledged
// any NN as active.
LOG.warn("Failed to get active NN", e);
} catch (InterruptedException e) {
LOG.warn("InterruptedException while waiting to see active NN", e);
}
Assert.assertNotNull("Failed to get ActiveNN",
dn.getAllBpOs().get(0).getActiveNN());
}
/**
* Cleans the resources and closes the instance of datanode.
* @throws IOException if an error occurred
*/
@After
public void tearDown() throws IOException {
if (!tearDownDone && dn != null) {
try {
dn.shutdown();
} catch(Exception e) {
LOG.error("Cannot close: ", e);
} finally {
File dir = new File(DATA_DIR);
if (dir.exists()) {
Assert.assertTrue(
"Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
}
}
tearDownDone = true;
}
}
/**
* Test to verify the race between finalizeBlock and Lease recovery.
*
* @throws Exception
*/
@Test(timeout = 20000)
public void testRaceBetweenReplicaRecoveryAndFinalizeBlock()
throws Exception {
// Stop the Mocked DN started in startup()
tearDown();
Configuration configuration = new HdfsConfiguration();
configuration.setLong(
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 5000L);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(configuration)
.numDataNodes(1).build();
try {
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread(() -> {
try {
DatanodeInfo[] locations = block.getLocations();
final BlockRecoveryCommand.RecoveringBlock recoveringBlock =
new BlockRecoveryCommand.RecoveringBlock(block.getBlock(),
locations, block.getBlock().getGenerationStamp() + 1);
try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
} catch (Exception e) {
LOG.error("Something went wrong.", e);
recoveryInitResult.set(false);
}
});
recoveryThread.start();
try {
out.close();
} catch (IOException e) {
Assert.assertTrue("Writing should fail",
e.getMessage().contains("are bad. Aborting..."));
} finally {
recoveryThread.join();
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();
}
}
}
/**
* Test for block recovery timeout. All recovery attempts will be delayed
* and the first attempt will be lost to trigger recovery timeout and retry.
*/
@Test(timeout = 300000L)
public void testRecoveryTimeout() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final Random r = new Random();
// Make sure first commitBlockSynchronization call from the DN gets lost
// for the recovery timeout to expire and new recovery attempt
// to be started.
GenericTestUtils.SleepAnswer delayer =
new GenericTestUtils.SleepAnswer(3000) {
private final AtomicBoolean callRealMethod = new AtomicBoolean();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(3000) + 6000);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
if (callRealMethod.get()) {
return invocation.callRealMethod();
}
callRealMethod.set(true);
return null;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
};
TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer);
}
/**
* Test for block recovery taking longer than the heartbeat interval.
*/
@Test(timeout = 300000L)
public void testRecoverySlowerThanHeartbeat() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
GenericTestUtils.SleepAnswer delayer =
new GenericTestUtils.SleepAnswer(3000, 6000);
TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer);
}
@Test(timeout = 60000)
public void testEcRecoverBlocks() throws Throwable {
// Stop the Mocked DN started in startup()
tearDown();
ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
try {
cluster.waitActive();
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer =
new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(),
anyLong());
String topDir = "/myDir";
DFSClient client = new DFSClient(null, spyNN, conf, null);
Path file = new Path(topDir + "/testECLeaseRecover");
client.mkdirs(topDir, null, false);
client.enableErasureCodingPolicy(ecPolicy.getName());
client.setErasureCodingPolicy(topDir, ecPolicy.getName());
OutputStream stm = client.create(file.toString(), true);
// write 5MB File
AppendTestUtil.write(stm, 0, 1024 * 1024 * 5);
final AtomicReference<Throwable> err = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
stm.close();
} catch (Throwable t1) {
err.set(t1);
}
});
t.start();
// Waiting for close to get to latch
delayer.waitForCall();
GenericTestUtils.waitFor(() -> {
try {
return client.getNamenode().recoverLease(file.toString(),
client.getClientName());
} catch (IOException e) {
return false;
}
}, 5000, 24000);
delayer.proceed();
} finally {
cluster.shutdown();
}
}
/**
* Test that block will be recovered even if there are less than the
* specified minReplication datanodes involved in its recovery.
*
* Check that, after recovering, the block will be successfully replicated.
*/
@Test(timeout = 300000L)
public void testRecoveryWillIgnoreMinReplication() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final int blockSize = 4096;
final int numReplicas = 3;
final String filename = "/testIgnoreMinReplication";
final Path filePath = new Path(filename);
Configuration configuration = new HdfsConfiguration();
configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
.build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final FSNamesystem fsn = cluster.getNamesystem();
// Create a file and never close the output stream to trigger recovery
FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
out.write(AppendTestUtil.randomBytes(0, blockSize));
out.hsync();
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), configuration);
LocatedBlock blk = dfsClient.getNamenode().
getBlockLocations(filename, 0, blockSize).
getLastLocatedBlock();
// Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
assertEquals(dataNodes.size(), numReplicas);
for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
cluster.stopDataNode(dataNode.getName());
}
GenericTestUtils.waitFor(() -> fsn.getNumDeadDataNodes() == 2,
300, 300000);
// Make sure hard lease expires to trigger replica recovery
cluster.setLeasePeriod(100L, 100L);
// Wait for recovery to succeed
GenericTestUtils.waitFor(() -> {
try {
return dfs.isFileClosed(filePath);
} catch (IOException e) {
LOG.info("Something went wrong.", e);
}
return false;
}, 300, 300000);
// Wait for the block to be replicated
DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
dfs, filePath), 1, numReplicas, 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -28,8 +28,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -42,12 +40,13 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Test to reconfigure some parameters for DataNode without restart * Test to reconfigure some parameters for DataNode without restart
*/ */
public class TestDataNodeReconfiguration { public class TestDataNodeReconfiguration {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestBlockRecovery.class); LoggerFactory.getLogger(TestBlockRecovery.class);
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()