HDFS-14314. fullBlockReportLeaseId should be reset after registering to NN. Contributed by star.
(cherry picked from commit387dbe587a
) (cherry picked from commite58ccca3ce
)
This commit is contained in:
parent
e663a6af89
commit
d951497f57
|
@ -105,6 +105,7 @@ class BPServiceActor implements Runnable {
|
|||
private final DataNode dn;
|
||||
private final DNConf dnConf;
|
||||
private long prevBlockReportId;
|
||||
private long fullBlockReportLeaseId;
|
||||
private final SortedSet<Integer> blockReportSizes =
|
||||
Collections.synchronizedSortedSet(new TreeSet<>());
|
||||
private final int maxDataLength;
|
||||
|
@ -129,6 +130,7 @@ class BPServiceActor implements Runnable {
|
|||
dnConf.ibrInterval,
|
||||
dn.getMetrics());
|
||||
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
||||
fullBlockReportLeaseId = 0;
|
||||
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
||||
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
|
||||
dnConf.outliersReportIntervalMs);
|
||||
|
@ -616,7 +618,6 @@ class BPServiceActor implements Runnable {
|
|||
+ "; heartBeatInterval=" + dnConf.heartBeatInterval
|
||||
+ (lifelineSender != null ?
|
||||
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
|
||||
long fullBlockReportLeaseId = 0;
|
||||
|
||||
//
|
||||
// Now loop for a long time....
|
||||
|
@ -783,6 +784,10 @@ class BPServiceActor implements Runnable {
|
|||
LOG.info("Block pool " + this + " successfully registered with NN");
|
||||
bpos.registrationSucceeded(this, bpRegistration);
|
||||
|
||||
// reset lease id whenever registered to NN.
|
||||
// ask for a new lease id at the next heartbeat.
|
||||
fullBlockReportLeaseId = 0;
|
||||
|
||||
// random short delay - helps scatter the BR from all DNs
|
||||
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
|
||||
}
|
||||
|
|
|
@ -27,12 +27,12 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -63,6 +63,8 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||
|
@ -92,6 +94,9 @@ public class TestBPOfferService {
|
|||
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
|
||||
private long firstCallTime = 0;
|
||||
private long secondCallTime = 0;
|
||||
private long firstLeaseId = 0;
|
||||
private long secondLeaseId = 0;
|
||||
private long nextFullBlockReportLeaseId = 1L;
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
|
||||
|
@ -171,16 +176,24 @@ public class TestBPOfferService {
|
|||
private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
|
||||
private final int nnIdx;
|
||||
|
||||
public HeartbeatAnswer(int nnIdx) {
|
||||
HeartbeatAnswer(int nnIdx) {
|
||||
this.nnIdx = nnIdx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
public HeartbeatResponse answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
heartbeatCounts[nnIdx]++;
|
||||
Boolean requestFullBlockReportLease =
|
||||
(Boolean) invocation.getArguments()[8];
|
||||
long fullBlockReportLeaseId = 0;
|
||||
if (requestFullBlockReportLease) {
|
||||
fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
|
||||
}
|
||||
LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
|
||||
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
|
||||
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
|
||||
ThreadLocalRandom.current().nextLong() | 1L);
|
||||
fullBlockReportLeaseId);
|
||||
//reset the command
|
||||
datanodeCommands[nnIdx] = new DatanodeCommand[0];
|
||||
return heartbeatResponse;
|
||||
|
@ -188,6 +201,24 @@ public class TestBPOfferService {
|
|||
}
|
||||
|
||||
|
||||
private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
|
||||
private final int nnIdx;
|
||||
|
||||
HeartbeatRegisterAnswer(int nnIdx) {
|
||||
this.nnIdx = nnIdx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
heartbeatCounts[nnIdx]++;
|
||||
DatanodeCommand[] cmds = new DatanodeCommand[1];
|
||||
cmds[0] = new RegisterCommand();
|
||||
return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
|
||||
null, 0L);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the BPOS can register to talk to two different NNs,
|
||||
* sends block reports to both, etc.
|
||||
|
@ -523,6 +554,26 @@ public class TestBPOfferService {
|
|||
}, 500, 10000);
|
||||
}
|
||||
|
||||
private void waitForRegistration(
|
||||
final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
|
||||
throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
// The DN should have register to both NNs.
|
||||
// first called by connectToNNAndHandshake, then called by reRegister.
|
||||
Mockito.verify(mockNN, Mockito.times(2))
|
||||
.registerDatanode(Mockito.any());
|
||||
return true;
|
||||
} catch (Throwable t) {
|
||||
LOG.info("waiting on block registerDatanode: " + t.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}, 500, 10000);
|
||||
}
|
||||
|
||||
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
|
||||
final ExtendedBlock fakeBlock,
|
||||
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
|
||||
|
@ -866,7 +917,7 @@ public class TestBPOfferService {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 30000)
|
||||
public void testRefreshNameNodes() throws Exception {
|
||||
|
||||
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
|
||||
|
@ -935,4 +986,68 @@ public class TestBPOfferService {
|
|||
bpos.join();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testRefreshLeaseId() throws Exception {
|
||||
Mockito.when(mockNN1.sendHeartbeat(
|
||||
Mockito.any(DatanodeRegistration.class),
|
||||
Mockito.any(StorageReport[].class),
|
||||
Mockito.anyLong(),
|
||||
Mockito.anyLong(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.anyInt(),
|
||||
Mockito.any(VolumeFailureSummary.class),
|
||||
Mockito.anyBoolean(),
|
||||
Mockito.any(SlowPeerReports.class),
|
||||
Mockito.any(SlowDiskReports.class)))
|
||||
//heartbeat to old NN instance
|
||||
.thenAnswer(new HeartbeatAnswer(0))
|
||||
//heartbeat to new NN instance with Register Command
|
||||
.thenAnswer(new HeartbeatRegisterAnswer(0))
|
||||
.thenAnswer(new HeartbeatAnswer(0));
|
||||
|
||||
Mockito.when(mockNN1.blockReport(
|
||||
Mockito.any(DatanodeRegistration.class),
|
||||
Mockito.anyString(),
|
||||
Mockito.any(StorageBlockReport[].class),
|
||||
Mockito.any(BlockReportContext.class)))
|
||||
.thenAnswer(
|
||||
new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
BlockReportContext context =
|
||||
(BlockReportContext) invocation.getArguments()[3];
|
||||
long leaseId = context.getLeaseId();
|
||||
LOG.info("leaseId = "+leaseId);
|
||||
|
||||
// leaseId == 1 means DN make block report with old leaseId
|
||||
// just reject and wait until DN request for a new leaseId
|
||||
if(leaseId == 1) {
|
||||
firstLeaseId = leaseId;
|
||||
throw new ConnectException(
|
||||
"network is not reachable for test. ");
|
||||
} else {
|
||||
secondLeaseId = leaseId;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
BPOfferService bpos = setupBPOSForNNs(mockNN1);
|
||||
bpos.start();
|
||||
|
||||
try {
|
||||
waitForInitialization(bpos);
|
||||
// Should call registration 2 times
|
||||
waitForRegistration(mockNN1, 2);
|
||||
assertEquals(1L, firstLeaseId);
|
||||
while(secondLeaseId != 2L) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
} finally {
|
||||
bpos.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue