HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.

(cherry picked from commit 387dbe587a)
(cherry picked from commit e58ccca3ce)
(cherry picked from commit d951497f57)
(cherry picked from commit e23a448e0e)
This commit is contained in:
Wei-Chiu Chuang 2019-03-04 10:43:44 -08:00
parent 5bed24ad5f
commit d71cfe1461
2 changed files with 126 additions and 6 deletions

View File

@ -105,6 +105,7 @@ class BPServiceActor implements Runnable {
private final DataNode dn;
private final DNConf dnConf;
private long prevBlockReportId;
private long fullBlockReportLeaseId;
private final SortedSet<Integer> blockReportSizes =
Collections.synchronizedSortedSet(new TreeSet<Integer>());
private final int maxDataLength;
@ -129,6 +130,7 @@ class BPServiceActor implements Runnable {
dnConf.ibrInterval,
dn.getMetrics());
prevBlockReportId = ThreadLocalRandom.current().nextLong();
fullBlockReportLeaseId = 0;
scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
dnConf.outliersReportIntervalMs);
@ -616,7 +618,6 @@ class BPServiceActor implements Runnable {
+ "; heartBeatInterval=" + dnConf.heartBeatInterval
+ (lifelineSender != null ?
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
long fullBlockReportLeaseId = 0;
//
// Now loop for a long time....
@ -782,6 +783,10 @@ class BPServiceActor implements Runnable {
LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
// reset lease id whenever registered to NN.
// ask for a new lease id at the next heartbeat.
fullBlockReportLeaseId = 0;
// random short delay - helps scatter the BR from all DNs
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
}

View File

@ -27,12 +27,12 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -63,6 +63,8 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
@ -92,6 +94,9 @@ public class TestBPOfferService {
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
private long firstCallTime = 0;
private long secondCallTime = 0;
private long firstLeaseId = 0;
private long secondLeaseId = 0;
private long nextFullBlockReportLeaseId = 1L;
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@ -171,16 +176,24 @@ public class TestBPOfferService {
private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
public HeartbeatAnswer(int nnIdx) {
HeartbeatAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
heartbeatCounts[nnIdx]++;
Boolean requestFullBlockReportLease =
(Boolean) invocation.getArguments()[8];
long fullBlockReportLeaseId = 0;
if (requestFullBlockReportLease) {
fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
}
LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
ThreadLocalRandom.current().nextLong() | 1L);
fullBlockReportLeaseId);
//reset the command
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return heartbeatResponse;
@ -188,6 +201,24 @@ public class TestBPOfferService {
}
private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
HeartbeatRegisterAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
heartbeatCounts[nnIdx]++;
DatanodeCommand[] cmds = new DatanodeCommand[1];
cmds[0] = new RegisterCommand();
return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
null, 0L);
}
}
/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
@ -523,6 +554,26 @@ public class TestBPOfferService {
}, 500, 10000);
}
private void waitForRegistration(
final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
// The DN should have register to both NNs.
// first called by connectToNNAndHandshake, then called by reRegister.
Mockito.verify(mockNN, Mockito.times(2))
.registerDatanode(Mockito.any(DatanodeRegistration.class));
return true;
} catch (Throwable t) {
LOG.info("waiting on block registerDatanode: " + t.getMessage());
return false;
}
}
}, 500, 10000);
}
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
@ -866,7 +917,7 @@ public class TestBPOfferService {
}
@Test
@Test(timeout = 30000)
public void testRefreshNameNodes() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@ -935,4 +986,68 @@ public class TestBPOfferService {
bpos.join();
}
}
@Test(timeout = 15000)
public void testRefreshLeaseId() throws Exception {
Mockito.when(mockNN1.sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class)))
//heartbeat to old NN instance
.thenAnswer(new HeartbeatAnswer(0))
//heartbeat to new NN instance with Register Command
.thenAnswer(new HeartbeatRegisterAnswer(0))
.thenAnswer(new HeartbeatAnswer(0));
Mockito.when(mockNN1.blockReport(
Mockito.any(DatanodeRegistration.class),
Mockito.anyString(),
Mockito.any(StorageBlockReport[].class),
Mockito.any(BlockReportContext.class)))
.thenAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocation)
throws Throwable {
BlockReportContext context =
(BlockReportContext) invocation.getArguments()[3];
long leaseId = context.getLeaseId();
LOG.info("leaseId = "+leaseId);
// leaseId == 1 means DN make block report with old leaseId
// just reject and wait until DN request for a new leaseId
if(leaseId == 1) {
firstLeaseId = leaseId;
throw new ConnectException(
"network is not reachable for test. ");
} else {
secondLeaseId = leaseId;
return null;
}
}
});
BPOfferService bpos = setupBPOSForNNs(mockNN1);
bpos.start();
try {
waitForInitialization(bpos);
// Should call registration 2 times
waitForRegistration(mockNN1, 2);
assertEquals(1L, firstLeaseId);
while(secondLeaseId != 2L) {
Thread.sleep(1000);
}
} finally {
bpos.stop();
}
}
}