From 880b9d24ff7b5f350ec99bac9b0862009460b486 Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Tue, 16 Jan 2018 16:13:19 +0530 Subject: [PATCH] HDFS-8693. refreshNamenodes does not support adding a new standby to a running DN. Contributed by Ajith S. --- .../hdfs/server/datanode/BPOfferService.java | 24 ++++-- .../server/datanode/TestBPOfferService.java | 77 ++++++++++++++++++- 2 files changed, 93 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index dbf7c8d8a94..a25f6a92d82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -148,11 +148,25 @@ class BPOfferService { } Set newAddrs = Sets.newHashSet(addrs); - if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) { - // Keep things simple for now -- we can implement this at a later date. - throw new IOException( - "HA does not currently support adding a new standby to a running DN. " + - "Please do a rolling restart of DNs to reconfigure the list of NNs."); + // Process added NNs + Set addedNNs = Sets.difference(newAddrs, oldAddrs); + for (InetSocketAddress addedNN : addedNNs) { + BPServiceActor actor = new BPServiceActor(addedNN, + lifelineAddrs.get(addrs.indexOf(addedNN)), this); + actor.start(); + bpServices.add(actor); + } + + // Process removed NNs + Set removedNNs = Sets.difference(oldAddrs, newAddrs); + for (InetSocketAddress removedNN : removedNNs) { + for (BPServiceActor actor : bpServices) { + if (actor.getNNSocketAddress().equals(removedNN)) { + actor.stop(); + shutdownActor(actor); + break; + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index ec199260a11..4863ca18f92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -99,10 +100,10 @@ public class TestBPOfferService { private DatanodeProtocolClientSideTranslatorPB mockNN1; private DatanodeProtocolClientSideTranslatorPB mockNN2; private final NNHAStatusHeartbeat[] mockHaStatuses = - new NNHAStatusHeartbeat[2]; + new NNHAStatusHeartbeat[3]; private final DatanodeCommand[][] datanodeCommands = - new DatanodeCommand[2][0]; - private final int[] heartbeatCounts = new int[2]; + new DatanodeCommand[3][0]; + private final int[] heartbeatCounts = new int[3]; private DataNode mockDn; private FsDatasetSpi mockFSDataset; @@ -864,4 +865,74 @@ public class TestBPOfferService { assertNotNull(bpos.getActiveNN()); } + + @Test + public void testRefreshNameNodes() throws Exception { + + BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2); + + bpos.start(); + try { + waitForBothActors(bpos); + + // The DN should have register to both NNs. + Mockito.verify(mockNN1) + .registerDatanode(Mockito.any(DatanodeRegistration.class)); + Mockito.verify(mockNN2) + .registerDatanode(Mockito.any(DatanodeRegistration.class)); + + // Should get block reports from both NNs + waitForBlockReport(mockNN1); + waitForBlockReport(mockNN2); + + // When we receive a block, it should report it to both NNs + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + + ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, + mockNN1); + assertEquals(1, ret.length); + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); + + ret = waitForBlockReceived(FAKE_BLOCK, mockNN2); + assertEquals(1, ret.length); + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); + + // add new standby + DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2); + Mockito.doReturn(mockNN3).when(mockDn) + .connectToNN(Mockito.eq(new InetSocketAddress(2))); + + ArrayList addrs = new ArrayList<>(); + ArrayList lifelineAddrs = new ArrayList<>( + addrs.size()); + // mockNN1 + addrs.add(new InetSocketAddress(0)); + lifelineAddrs.add(null); + // mockNN3 + addrs.add(new InetSocketAddress(2)); + lifelineAddrs.add(null); + + bpos.refreshNNList(addrs, lifelineAddrs); + + assertEquals(2, bpos.getBPServiceActors().size()); + // wait for handshake to run + Thread.sleep(1000); + + // verify new NN registered + Mockito.verify(mockNN3) + .registerDatanode(Mockito.any(DatanodeRegistration.class)); + + // When we receive a block, it should report it to both NNs + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + + // veridfy new NN recieved block report + ret = waitForBlockReceived(FAKE_BLOCK, mockNN3); + assertEquals(1, ret.length); + assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock()); + + } finally { + bpos.stop(); + bpos.join(); + } + } }