HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.

This commit is contained in:
Wei-Chiu Chuang 2019-03-04 10:43:44 -08:00
parent eed8b794d8
commit 387dbe587a
2 changed files with 126 additions and 6 deletions

View File

@ -107,6 +107,7 @@ class BPServiceActor implements Runnable {
private final DataNode dn; private final DataNode dn;
private final DNConf dnConf; private final DNConf dnConf;
private long prevBlockReportId; private long prevBlockReportId;
private long fullBlockReportLeaseId;
private final SortedSet<Integer> blockReportSizes = private final SortedSet<Integer> blockReportSizes =
Collections.synchronizedSortedSet(new TreeSet<>()); Collections.synchronizedSortedSet(new TreeSet<>());
private final int maxDataLength; private final int maxDataLength;
@ -131,6 +132,7 @@ class BPServiceActor implements Runnable {
dnConf.ibrInterval, dnConf.ibrInterval,
dn.getMetrics()); dn.getMetrics());
prevBlockReportId = ThreadLocalRandom.current().nextLong(); prevBlockReportId = ThreadLocalRandom.current().nextLong();
fullBlockReportLeaseId = 0;
scheduler = new Scheduler(dnConf.heartBeatInterval, scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
dnConf.outliersReportIntervalMs); dnConf.outliersReportIntervalMs);
@ -638,7 +640,6 @@ class BPServiceActor implements Runnable {
+ "; heartBeatInterval=" + dnConf.heartBeatInterval + "; heartBeatInterval=" + dnConf.heartBeatInterval
+ (lifelineSender != null ? + (lifelineSender != null ?
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : "")); "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
long fullBlockReportLeaseId = 0;
// //
// Now loop for a long time.... // Now loop for a long time....
@ -806,6 +807,10 @@ class BPServiceActor implements Runnable {
LOG.info("Block pool " + this + " successfully registered with NN"); LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration); bpos.registrationSucceeded(this, bpRegistration);
// reset lease id whenever registered to NN.
// ask for a new lease id at the next heartbeat.
fullBlockReportLeaseId = 0;
// random short delay - helps scatter the BR from all DNs // random short delay - helps scatter the BR from all DNs
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs); scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
} }

View File

@ -27,12 +27,12 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -61,6 +61,8 @@ import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
@ -90,6 +92,9 @@ public class TestBPOfferService {
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class); private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
private long firstCallTime = 0; private long firstCallTime = 0;
private long secondCallTime = 0; private long secondCallTime = 0;
private long firstLeaseId = 0;
private long secondLeaseId = 0;
private long nextFullBlockReportLeaseId = 1L;
static { static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@ -169,16 +174,24 @@ public class TestBPOfferService {
private class HeartbeatAnswer implements Answer<HeartbeatResponse> { private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx; private final int nnIdx;
public HeartbeatAnswer(int nnIdx) { HeartbeatAnswer(int nnIdx) {
this.nnIdx = nnIdx; this.nnIdx = nnIdx;
} }
@Override @Override
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
heartbeatCounts[nnIdx]++; heartbeatCounts[nnIdx]++;
Boolean requestFullBlockReportLease =
(Boolean) invocation.getArguments()[8];
long fullBlockReportLeaseId = 0;
if (requestFullBlockReportLease) {
fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
}
LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
HeartbeatResponse heartbeatResponse = new HeartbeatResponse( HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null, datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
ThreadLocalRandom.current().nextLong() | 1L); fullBlockReportLeaseId);
//reset the command //reset the command
datanodeCommands[nnIdx] = new DatanodeCommand[0]; datanodeCommands[nnIdx] = new DatanodeCommand[0];
return heartbeatResponse; return heartbeatResponse;
@ -186,6 +199,24 @@ public class TestBPOfferService {
} }
private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
HeartbeatRegisterAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
heartbeatCounts[nnIdx]++;
DatanodeCommand[] cmds = new DatanodeCommand[1];
cmds[0] = new RegisterCommand();
return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
null, 0L);
}
}
/** /**
* Test that the BPOS can register to talk to two different NNs, * Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc. * sends block reports to both, etc.
@ -521,6 +552,26 @@ public class TestBPOfferService {
}, 500, 10000); }, 500, 10000);
} }
private void waitForRegistration(
final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
// The DN should have register to both NNs.
// first called by connectToNNAndHandshake, then called by reRegister.
Mockito.verify(mockNN, Mockito.times(2))
.registerDatanode(Mockito.any());
return true;
} catch (Throwable t) {
LOG.info("waiting on block registerDatanode: " + t.getMessage());
return false;
}
}
}, 500, 10000);
}
private ReceivedDeletedBlockInfo[] waitForBlockReceived( private ReceivedDeletedBlockInfo[] waitForBlockReceived(
final ExtendedBlock fakeBlock, final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
@ -857,7 +908,7 @@ public class TestBPOfferService {
} }
@Test @Test(timeout = 30000)
public void testRefreshNameNodes() throws Exception { public void testRefreshNameNodes() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2); BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@ -930,4 +981,68 @@ public class TestBPOfferService {
bpos.join(); bpos.join();
} }
} }
@Test(timeout = 15000)
public void testRefreshLeaseId() throws Exception {
Mockito.when(mockNN1.sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class)))
//heartbeat to old NN instance
.thenAnswer(new HeartbeatAnswer(0))
//heartbeat to new NN instance with Register Command
.thenAnswer(new HeartbeatRegisterAnswer(0))
.thenAnswer(new HeartbeatAnswer(0));
Mockito.when(mockNN1.blockReport(
Mockito.any(DatanodeRegistration.class),
Mockito.anyString(),
Mockito.any(StorageBlockReport[].class),
Mockito.any(BlockReportContext.class)))
.thenAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocation)
throws Throwable {
BlockReportContext context =
(BlockReportContext) invocation.getArguments()[3];
long leaseId = context.getLeaseId();
LOG.info("leaseId = "+leaseId);
// leaseId == 1 means DN make block report with old leaseId
// just reject and wait until DN request for a new leaseId
if(leaseId == 1) {
firstLeaseId = leaseId;
throw new ConnectException(
"network is not reachable for test. ");
} else {
secondLeaseId = leaseId;
return null;
}
}
});
BPOfferService bpos = setupBPOSForNNs(mockNN1);
bpos.start();
try {
waitForInitialization(bpos);
// Should call registration 2 times
waitForRegistration(mockNN1, 2);
assertEquals(1L, firstLeaseId);
while(secondLeaseId != 2L) {
Thread.sleep(1000);
}
} finally {
bpos.stop();
}
}
} }