HDFS-8693. refreshNamenodes does not support adding a new standby to a running DN. Contributed by Ajith S.
This commit is contained in:
parent
d09058b2fd
commit
880b9d24ff
|
@ -148,11 +148,25 @@ class BPOfferService {
|
||||||
}
|
}
|
||||||
Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
|
Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
|
||||||
|
|
||||||
if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
|
// Process added NNs
|
||||||
// Keep things simple for now -- we can implement this at a later date.
|
Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
|
||||||
throw new IOException(
|
for (InetSocketAddress addedNN : addedNNs) {
|
||||||
"HA does not currently support adding a new standby to a running DN. " +
|
BPServiceActor actor = new BPServiceActor(addedNN,
|
||||||
"Please do a rolling restart of DNs to reconfigure the list of NNs.");
|
lifelineAddrs.get(addrs.indexOf(addedNN)), this);
|
||||||
|
actor.start();
|
||||||
|
bpServices.add(actor);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process removed NNs
|
||||||
|
Set<InetSocketAddress> removedNNs = Sets.difference(oldAddrs, newAddrs);
|
||||||
|
for (InetSocketAddress removedNN : removedNNs) {
|
||||||
|
for (BPServiceActor actor : bpServices) {
|
||||||
|
if (actor.getNNSocketAddress().equals(removedNN)) {
|
||||||
|
actor.stop();
|
||||||
|
shutdownActor(actor);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -99,10 +100,10 @@ public class TestBPOfferService {
|
||||||
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
||||||
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
||||||
private final NNHAStatusHeartbeat[] mockHaStatuses =
|
private final NNHAStatusHeartbeat[] mockHaStatuses =
|
||||||
new NNHAStatusHeartbeat[2];
|
new NNHAStatusHeartbeat[3];
|
||||||
private final DatanodeCommand[][] datanodeCommands =
|
private final DatanodeCommand[][] datanodeCommands =
|
||||||
new DatanodeCommand[2][0];
|
new DatanodeCommand[3][0];
|
||||||
private final int[] heartbeatCounts = new int[2];
|
private final int[] heartbeatCounts = new int[3];
|
||||||
private DataNode mockDn;
|
private DataNode mockDn;
|
||||||
private FsDatasetSpi<?> mockFSDataset;
|
private FsDatasetSpi<?> mockFSDataset;
|
||||||
|
|
||||||
|
@ -864,4 +865,74 @@ public class TestBPOfferService {
|
||||||
assertNotNull(bpos.getActiveNN());
|
assertNotNull(bpos.getActiveNN());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshNameNodes() throws Exception {
|
||||||
|
|
||||||
|
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
|
||||||
|
|
||||||
|
bpos.start();
|
||||||
|
try {
|
||||||
|
waitForBothActors(bpos);
|
||||||
|
|
||||||
|
// The DN should have register to both NNs.
|
||||||
|
Mockito.verify(mockNN1)
|
||||||
|
.registerDatanode(Mockito.any(DatanodeRegistration.class));
|
||||||
|
Mockito.verify(mockNN2)
|
||||||
|
.registerDatanode(Mockito.any(DatanodeRegistration.class));
|
||||||
|
|
||||||
|
// Should get block reports from both NNs
|
||||||
|
waitForBlockReport(mockNN1);
|
||||||
|
waitForBlockReport(mockNN2);
|
||||||
|
|
||||||
|
// When we receive a block, it should report it to both NNs
|
||||||
|
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|
||||||
|
|
||||||
|
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
|
||||||
|
mockNN1);
|
||||||
|
assertEquals(1, ret.length);
|
||||||
|
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
|
||||||
|
|
||||||
|
ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
|
||||||
|
assertEquals(1, ret.length);
|
||||||
|
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
|
||||||
|
|
||||||
|
// add new standby
|
||||||
|
DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2);
|
||||||
|
Mockito.doReturn(mockNN3).when(mockDn)
|
||||||
|
.connectToNN(Mockito.eq(new InetSocketAddress(2)));
|
||||||
|
|
||||||
|
ArrayList<InetSocketAddress> addrs = new ArrayList<>();
|
||||||
|
ArrayList<InetSocketAddress> lifelineAddrs = new ArrayList<>(
|
||||||
|
addrs.size());
|
||||||
|
// mockNN1
|
||||||
|
addrs.add(new InetSocketAddress(0));
|
||||||
|
lifelineAddrs.add(null);
|
||||||
|
// mockNN3
|
||||||
|
addrs.add(new InetSocketAddress(2));
|
||||||
|
lifelineAddrs.add(null);
|
||||||
|
|
||||||
|
bpos.refreshNNList(addrs, lifelineAddrs);
|
||||||
|
|
||||||
|
assertEquals(2, bpos.getBPServiceActors().size());
|
||||||
|
// wait for handshake to run
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// verify new NN registered
|
||||||
|
Mockito.verify(mockNN3)
|
||||||
|
.registerDatanode(Mockito.any(DatanodeRegistration.class));
|
||||||
|
|
||||||
|
// When we receive a block, it should report it to both NNs
|
||||||
|
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
|
||||||
|
|
||||||
|
// veridfy new NN recieved block report
|
||||||
|
ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
|
||||||
|
assertEquals(1, ret.length);
|
||||||
|
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
bpos.stop();
|
||||||
|
bpos.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue