HDFS-9917. IBR accumulate more objects when SNN was down for sometime. (Contributed by Brahma Reddy Battula)
This commit is contained in:
parent
f6b1a81812
commit
818d6b799e
|
@ -798,6 +798,11 @@ class BPServiceActor implements Runnable {
|
||||||
// and re-register
|
// and re-register
|
||||||
register(nsInfo);
|
register(nsInfo);
|
||||||
scheduler.scheduleHeartbeat();
|
scheduler.scheduleHeartbeat();
|
||||||
|
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
|
||||||
|
// for sometime.
|
||||||
|
if (state == HAServiceState.STANDBY) {
|
||||||
|
ibrManager.clearIBRs();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -258,4 +258,13 @@ class IncrementalBlockReportManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clearIBRs() {
|
||||||
|
pendingIBRs.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getPendingIBRSize() {
|
||||||
|
return pendingIBRs.size();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -30,6 +30,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -48,10 +49,12 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
|
@ -92,7 +95,10 @@ public class TestBPOfferService {
|
||||||
|
|
||||||
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
||||||
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
||||||
private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
|
private final NNHAStatusHeartbeat[] mockHaStatuses =
|
||||||
|
new NNHAStatusHeartbeat[2];
|
||||||
|
private final DatanodeCommand[][] datanodeCommands =
|
||||||
|
new DatanodeCommand[2][0];
|
||||||
private final int[] heartbeatCounts = new int[2];
|
private final int[] heartbeatCounts = new int[2];
|
||||||
private DataNode mockDn;
|
private DataNode mockDn;
|
||||||
private FsDatasetSpi<?> mockFSDataset;
|
private FsDatasetSpi<?> mockFSDataset;
|
||||||
|
@ -147,6 +153,7 @@ public class TestBPOfferService {
|
||||||
Mockito.any(VolumeFailureSummary.class),
|
Mockito.any(VolumeFailureSummary.class),
|
||||||
Mockito.anyBoolean());
|
Mockito.anyBoolean());
|
||||||
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
|
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
|
||||||
|
datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
||||||
return mock;
|
return mock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,9 +172,12 @@ public class TestBPOfferService {
|
||||||
@Override
|
@Override
|
||||||
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
|
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||||
heartbeatCounts[nnIdx]++;
|
heartbeatCounts[nnIdx]++;
|
||||||
return new HeartbeatResponse(new DatanodeCommand[0],
|
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
|
||||||
mockHaStatuses[nnIdx], null,
|
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
|
||||||
ThreadLocalRandom.current().nextLong() | 1L);
|
ThreadLocalRandom.current().nextLong() | 1L);
|
||||||
|
//reset the command
|
||||||
|
datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
||||||
|
return heartbeatResponse;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -709,4 +719,84 @@ public class TestBPOfferService {
|
||||||
bpos.join();
|
bpos.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HDFS-9917 : Standby IBR accumulation when Standby was down.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIBRClearanceForStandbyOnReRegister() throws Exception {
|
||||||
|
final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
||||||
|
bpos.start();
|
||||||
|
try {
|
||||||
|
waitForInitialization(bpos);
|
||||||
|
// Should start with neither NN as active.
|
||||||
|
assertNull(bpos.getActiveNN());
|
||||||
|
// Have NN1 claim active at txid 1
|
||||||
|
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
|
||||||
|
bpos.triggerHeartbeatForTests();
|
||||||
|
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
||||||
|
assertSame(mockNN1, bpos.getActiveNN());
|
||||||
|
// Return nothing when active Active Namenode gets IBRs
|
||||||
|
Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted(
|
||||||
|
Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
|
||||||
|
.any(StorageReceivedDeletedBlocks[].class));
|
||||||
|
|
||||||
|
final IOException re = new IOException(
|
||||||
|
"Standby NN is currently not able to process IBR");
|
||||||
|
|
||||||
|
final AtomicBoolean ibrReported = new AtomicBoolean(false);
|
||||||
|
// throw exception for standby when first IBR is receieved
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
ibrReported.set(true);
|
||||||
|
throw re;
|
||||||
|
}
|
||||||
|
}).when(mockNN2).blockReceivedAndDeleted(
|
||||||
|
Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
|
||||||
|
.any(StorageReceivedDeletedBlocks[].class));
|
||||||
|
|
||||||
|
DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
|
||||||
|
Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
|
||||||
|
// Add IBRs
|
||||||
|
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
|
||||||
|
// Send heartbeat so that the BpServiceActor can send IBR to
|
||||||
|
// namenode
|
||||||
|
bpos.triggerHeartbeatForTests();
|
||||||
|
// Wait till first IBR is received at standbyNN. Just for confirmation.
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return ibrReported.get();
|
||||||
|
}
|
||||||
|
}, 100, 1000);
|
||||||
|
|
||||||
|
// Send register command back to Datanode to reRegister().
|
||||||
|
// After reRegister IBRs should be cleared.
|
||||||
|
datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() };
|
||||||
|
assertEquals(
|
||||||
|
"IBR size before reRegister should be non-0", 1, getStandbyIBRSize(
|
||||||
|
bpos));
|
||||||
|
bpos.triggerHeartbeatForTests();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return getStandbyIBRSize(bpos) == 0;
|
||||||
|
}
|
||||||
|
}, 100, 1000);
|
||||||
|
} finally {
|
||||||
|
bpos.stop();
|
||||||
|
bpos.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getStandbyIBRSize(BPOfferService bpos) {
|
||||||
|
List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
|
||||||
|
for (BPServiceActor bpServiceActor : bpServiceActors) {
|
||||||
|
if (bpServiceActor.state == HAServiceState.STANDBY) {
|
||||||
|
return bpServiceActor.getIbrManager().getPendingIBRSize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue