HDFS-8693. refreshNamenodes does not support adding a new standby to a running DN. Contributed by Ajith S.

(cherry picked from commit 880b9d24ff)
This commit is contained in:
Brahma Reddy Battula 2018-01-16 16:13:19 +05:30
parent 8985fccbbc
commit 8e7ce0eb4c
2 changed files with 93 additions and 8 deletions

View File

@ -146,11 +146,25 @@ class BPOfferService {
} }
Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs); Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) { // Process added NNs
// Keep things simple for now -- we can implement this at a later date. Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
throw new IOException( for (InetSocketAddress addedNN : addedNNs) {
"HA does not currently support adding a new standby to a running DN. " + BPServiceActor actor = new BPServiceActor(addedNN,
"Please do a rolling restart of DNs to reconfigure the list of NNs."); lifelineAddrs.get(addrs.indexOf(addedNN)), this);
actor.start();
bpServices.add(actor);
}
// Process removed NNs
Set<InetSocketAddress> removedNNs = Sets.difference(oldAddrs, newAddrs);
for (InetSocketAddress removedNN : removedNNs) {
for (BPServiceActor actor : bpServices) {
if (actor.getNNSocketAddress().equals(removedNN)) {
actor.stop();
shutdownActor(actor);
break;
}
}
} }
} }

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -99,10 +100,10 @@ public class TestBPOfferService {
private DatanodeProtocolClientSideTranslatorPB mockNN1; private DatanodeProtocolClientSideTranslatorPB mockNN1;
private DatanodeProtocolClientSideTranslatorPB mockNN2; private DatanodeProtocolClientSideTranslatorPB mockNN2;
private final NNHAStatusHeartbeat[] mockHaStatuses = private final NNHAStatusHeartbeat[] mockHaStatuses =
new NNHAStatusHeartbeat[2]; new NNHAStatusHeartbeat[3];
private final DatanodeCommand[][] datanodeCommands = private final DatanodeCommand[][] datanodeCommands =
new DatanodeCommand[2][0]; new DatanodeCommand[3][0];
private final int[] heartbeatCounts = new int[2]; private final int[] heartbeatCounts = new int[3];
private DataNode mockDn; private DataNode mockDn;
private FsDatasetSpi<?> mockFSDataset; private FsDatasetSpi<?> mockFSDataset;
@ -864,4 +865,74 @@ public class TestBPOfferService {
assertNotNull(bpos.getActiveNN()); assertNotNull(bpos.getActiveNN());
} }
@Test
public void testRefreshNameNodes() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
bpos.start();
try {
waitForBothActors(bpos);
// The DN should have register to both NNs.
Mockito.verify(mockNN1)
.registerDatanode(Mockito.any(DatanodeRegistration.class));
Mockito.verify(mockNN2)
.registerDatanode(Mockito.any(DatanodeRegistration.class));
// Should get block reports from both NNs
waitForBlockReport(mockNN1);
waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
mockNN1);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
// add new standby
DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2);
Mockito.doReturn(mockNN3).when(mockDn)
.connectToNN(Mockito.eq(new InetSocketAddress(2)));
ArrayList<InetSocketAddress> addrs = new ArrayList<>();
ArrayList<InetSocketAddress> lifelineAddrs = new ArrayList<>(
addrs.size());
// mockNN1
addrs.add(new InetSocketAddress(0));
lifelineAddrs.add(null);
// mockNN3
addrs.add(new InetSocketAddress(2));
lifelineAddrs.add(null);
bpos.refreshNNList(addrs, lifelineAddrs);
assertEquals(2, bpos.getBPServiceActors().size());
// wait for handshake to run
Thread.sleep(1000);
// verify new NN registered
Mockito.verify(mockNN3)
.registerDatanode(Mockito.any(DatanodeRegistration.class));
// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
// veridfy new NN recieved block report
ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
} finally {
bpos.stop();
bpos.join();
}
}
} }