HDFS-9917. IBR accumulate more objects when SNN was down for sometime. (Contributed by Brahma Reddy Battula)
(cherry picked from commit818d6b799e
) (cherry picked from commitad08114b92
)
This commit is contained in:
parent
63fe2ae72c
commit
3deeb9a15f
|
@ -798,6 +798,11 @@ class BPServiceActor implements Runnable {
|
|||
// and re-register
|
||||
register(nsInfo);
|
||||
scheduler.scheduleHeartbeat();
|
||||
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
|
||||
// for sometime.
|
||||
if (state == HAServiceState.STANDBY) {
|
||||
ibrManager.clearIBRs();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -258,4 +258,13 @@ class IncrementalBlockReportManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
void clearIBRs() {
|
||||
pendingIBRs.clear();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getPendingIBRSize() {
|
||||
return pendingIBRs.size();
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -48,10 +49,12 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
|
@ -92,7 +95,10 @@ public class TestBPOfferService {
|
|||
|
||||
private DatanodeProtocolClientSideTranslatorPB mockNN1;
|
||||
private DatanodeProtocolClientSideTranslatorPB mockNN2;
|
||||
private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
|
||||
private final NNHAStatusHeartbeat[] mockHaStatuses =
|
||||
new NNHAStatusHeartbeat[2];
|
||||
private final DatanodeCommand[][] datanodeCommands =
|
||||
new DatanodeCommand[2][0];
|
||||
private final int[] heartbeatCounts = new int[2];
|
||||
private DataNode mockDn;
|
||||
private FsDatasetSpi<?> mockFSDataset;
|
||||
|
@ -147,6 +153,7 @@ public class TestBPOfferService {
|
|||
Mockito.any(VolumeFailureSummary.class),
|
||||
Mockito.anyBoolean());
|
||||
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
|
||||
datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
||||
return mock;
|
||||
}
|
||||
|
||||
|
@ -165,9 +172,12 @@ public class TestBPOfferService {
|
|||
@Override
|
||||
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
heartbeatCounts[nnIdx]++;
|
||||
return new HeartbeatResponse(new DatanodeCommand[0],
|
||||
mockHaStatuses[nnIdx], null,
|
||||
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
|
||||
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
|
||||
ThreadLocalRandom.current().nextLong() | 1L);
|
||||
//reset the command
|
||||
datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
||||
return heartbeatResponse;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -709,4 +719,84 @@ public class TestBPOfferService {
|
|||
bpos.join();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* HDFS-9917 : Standby IBR accumulation when Standby was down.
|
||||
*/
|
||||
@Test
|
||||
public void testIBRClearanceForStandbyOnReRegister() throws Exception {
|
||||
final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
||||
bpos.start();
|
||||
try {
|
||||
waitForInitialization(bpos);
|
||||
// Should start with neither NN as active.
|
||||
assertNull(bpos.getActiveNN());
|
||||
// Have NN1 claim active at txid 1
|
||||
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
|
||||
bpos.triggerHeartbeatForTests();
|
||||
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
|
||||
assertSame(mockNN1, bpos.getActiveNN());
|
||||
// Return nothing when active Active Namenode gets IBRs
|
||||
Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted(
|
||||
Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
|
||||
.any(StorageReceivedDeletedBlocks[].class));
|
||||
|
||||
final IOException re = new IOException(
|
||||
"Standby NN is currently not able to process IBR");
|
||||
|
||||
final AtomicBoolean ibrReported = new AtomicBoolean(false);
|
||||
// throw exception for standby when first IBR is receieved
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
ibrReported.set(true);
|
||||
throw re;
|
||||
}
|
||||
}).when(mockNN2).blockReceivedAndDeleted(
|
||||
Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
|
||||
.any(StorageReceivedDeletedBlocks[].class));
|
||||
|
||||
DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
|
||||
Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
|
||||
// Add IBRs
|
||||
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
|
||||
// Send heartbeat so that the BpServiceActor can send IBR to
|
||||
// namenode
|
||||
bpos.triggerHeartbeatForTests();
|
||||
// Wait till first IBR is received at standbyNN. Just for confirmation.
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return ibrReported.get();
|
||||
}
|
||||
}, 100, 1000);
|
||||
|
||||
// Send register command back to Datanode to reRegister().
|
||||
// After reRegister IBRs should be cleared.
|
||||
datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() };
|
||||
assertEquals(
|
||||
"IBR size before reRegister should be non-0", 1, getStandbyIBRSize(
|
||||
bpos));
|
||||
bpos.triggerHeartbeatForTests();
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return getStandbyIBRSize(bpos) == 0;
|
||||
}
|
||||
}, 100, 1000);
|
||||
} finally {
|
||||
bpos.stop();
|
||||
bpos.join();
|
||||
}
|
||||
}
|
||||
|
||||
private int getStandbyIBRSize(BPOfferService bpos) {
|
||||
List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
|
||||
for (BPServiceActor bpServiceActor : bpServiceActors) {
|
||||
if (bpServiceActor.state == HAServiceState.STANDBY) {
|
||||
return bpServiceActor.getIbrManager().getPendingIBRSize();
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue