HDFS-9726. Refactor IBR code to a new class.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-02-05 23:17:59 +08:00
parent fdb89ac0ae
commit 494b6c7c4b
6 changed files with 285 additions and 279 deletions

View File

@ -965,6 +965,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9715. Check storage ID uniqueness on datanode startup
(Lei (Eddy) Xu via vinayakumarb)
HDFS-9726. Refactor IBR code to a new class. (szetszwo)
BUG FIXES
HDFS-8091: ACLStatus and XAttributes should be presented to

View File

@ -233,14 +233,27 @@ void reportBadBlocks(ExtendedBlock block,
*/
void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
storageUuid);
}
void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid);
}
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid);
}
private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
String delHint, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
delHint);
final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), status, delHint);
final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlock(bInfo, storageUuid, true);
actor.getIbrManager().notifyNamenodeBlock(info, storage);
}
}
@ -252,26 +265,6 @@ private void checkBlock(ExtendedBlock block) {
block.getBlockPoolId(), getBlockPoolId());
}
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
}
}
void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlock(bInfo, storageUuid, false);
}
}
//This must be called only by blockPoolManager
void start() {
for (BPServiceActor actor : bpServices) {
@ -577,7 +570,7 @@ void triggerBlockReportForTests() throws IOException {
@VisibleForTesting
void triggerDeletionReportForTests() throws IOException {
for (BPServiceActor actor : bpServices) {
actor.triggerDeletionReportForTests();
actor.getIbrManager().triggerDeletionReportForTests();
}
}

View File

@ -31,7 +31,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Joiner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
@ -51,9 +50,7 @@
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
@ -61,10 +58,10 @@
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import com.google.common.base.Joiner;
/**
* A thread per active or standby namenode to perform:
@ -95,25 +92,14 @@ static enum RunningState {
}
private volatile RunningState runningState = RunningState.CONNECTING;
/**
* Between block reports (which happen on the order of once an hour) the
* DN reports smaller incremental changes to its block list. This map,
* keyed by block ID, contains the pending changes which have yet to be
* reported to the NN. Access should be synchronized on this object.
*/
private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
pendingIncrementalBRperStorage = Maps.newHashMap();
// IBR = Incremental Block Report. If this flag is set then an IBR will be
// sent immediately by the actor thread without waiting for the IBR timer
// to elapse.
private volatile boolean sendImmediateIBR = false;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
private final DNConf dnConf;
private long prevBlockReportId;
private final IncrementalBlockReportManager ibrManager
= new IncrementalBlockReportManager();
private DatanodeRegistration bpRegistration;
final LinkedList<BPServiceActorAction> bpThreadQueue
= new LinkedList<BPServiceActorAction>();
@ -131,6 +117,10 @@ public DatanodeRegistration getBpRegistration() {
return bpRegistration;
}
IncrementalBlockReportManager getIbrManager() {
return ibrManager;
}
boolean isAlive() {
if (!shouldServiceRun || !bpThread.isAlive()) {
return false;
@ -231,141 +221,20 @@ private void connectToNNAndHandshake() throws IOException {
register(nsInfo);
}
/**
* Report received blocks and delete hints to the Namenode for each
* storage.
*
* @throws IOException
*/
private void reportReceivedDeletedBlocks() throws IOException {
// Generate a list of the pending reports for each storage under the lock
ArrayList<StorageReceivedDeletedBlocks> reports =
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
synchronized (pendingIncrementalBRperStorage) {
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
final DatanodeStorage storage = entry.getKey();
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
if (perStorageMap.getBlockInfoCount() > 0) {
// Send newly-received and deleted blockids to namenode
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
}
}
sendImmediateIBR = false;
}
if (reports.size() == 0) {
// Nothing new to report.
return;
}
// Send incremental block reports to the Namenode outside the lock
boolean success = false;
final long startTime = monotonicNow();
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true;
} finally {
dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime);
if (!success) {
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
// If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime.
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorage());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true;
}
}
}
}
}
/**
* @return pending incremental block report for given {@code storage}
*/
private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
DatanodeStorage storage) {
PerStoragePendingIncrementalBR mapForStorage =
pendingIncrementalBRperStorage.get(storage);
if (mapForStorage == null) {
// This is the first time we are adding incremental BR state for
// this storage so create a new map. This is required once per
// storage, per service actor.
mapForStorage = new PerStoragePendingIncrementalBR();
pendingIncrementalBRperStorage.put(storage, mapForStorage);
}
return mapForStorage;
}
/**
* Add a blockInfo for notification to NameNode. If another entry
* exists for the same block it is removed.
*
* Caller must synchronize access using pendingIncrementalBRperStorage.
*/
void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
DatanodeStorage storage) {
// Make sure another entry for the same block is first removed.
// There may only be one such entry.
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
if (entry.getValue().removeBlockInfo(bInfo)) {
break;
}
}
getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
}
/*
* Informing the name node could take a long long time! Should we wait
* till namenode is informed before responding with success to the
* client? For now we don't.
*/
void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo,
String storageUuid, boolean now) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
sendImmediateIBR = true;
// If now is true, the report is sent right away.
// Otherwise, it will be sent out in the next heartbeat.
if (now) {
pendingIncrementalBRperStorage.notifyAll();
}
}
}
void notifyNamenodeDeletedBlock(
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
}
}
/**
* Run an immediate block report on this thread. Used by tests.
*/
@VisibleForTesting
void triggerBlockReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
synchronized (ibrManager) {
scheduler.scheduleHeartbeat();
long oldBlockReportTime = scheduler.nextBlockReportTime;
scheduler.forceFullBlockReportNow();
pendingIncrementalBRperStorage.notifyAll();
ibrManager.notifyAll();
while (oldBlockReportTime == scheduler.nextBlockReportTime) {
try {
pendingIncrementalBRperStorage.wait(100);
ibrManager.wait(100);
} catch (InterruptedException e) {
return;
}
@ -375,12 +244,12 @@ void triggerBlockReportForTests() {
@VisibleForTesting
void triggerHeartbeatForTests() {
synchronized (pendingIncrementalBRperStorage) {
synchronized (ibrManager) {
final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
pendingIncrementalBRperStorage.notifyAll();
ibrManager.notifyAll();
while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
try {
pendingIncrementalBRperStorage.wait(100);
ibrManager.wait(100);
} catch (InterruptedException e) {
return;
}
@ -388,27 +257,6 @@ void triggerHeartbeatForTests() {
}
}
@VisibleForTesting
void triggerDeletionReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
sendImmediateIBR = true;
pendingIncrementalBRperStorage.notifyAll();
while (sendImmediateIBR) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
return;
}
}
}
}
@VisibleForTesting
boolean hasPendingIBR() {
return sendImmediateIBR;
}
private long generateUniqueBlockReportId() {
// Initialize the block report ID the first time through.
// Note that 0 is used on the NN to indicate "uninitialized", so we should
@ -432,7 +280,8 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
reportReceivedDeletedBlocks();
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), dn.getMetrics());
long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@ -697,8 +546,9 @@ private void offerService() throws Exception {
}
}
}
if (sendImmediateIBR || sendHeartbeat) {
reportReceivedDeletedBlocks();
if (ibrManager.sendImmediately() || sendHeartbeat) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), dn.getMetrics());
}
List<DatanodeCommand> cmds = null;
@ -723,10 +573,10 @@ private void offerService() throws Exception {
// or work arrives, and then iterate again.
//
long waitTime = scheduler.getHeartbeatWaitTime();
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
synchronized(ibrManager) {
if (waitTime > 0 && !ibrManager.sendImmediately()) {
try {
pendingIncrementalBRperStorage.wait(waitTime);
ibrManager.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
@ -915,82 +765,15 @@ void reRegister() throws IOException {
}
}
private static class PerStoragePendingIncrementalBR {
private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
Maps.newHashMap();
/**
* Return the number of blocks on this storage that have pending
* incremental block reports.
* @return
*/
int getBlockInfoCount() {
return pendingIncrementalBR.size();
}
/**
* Dequeue and return all pending incremental block report state.
* @return
*/
ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
ReceivedDeletedBlockInfo[] blockInfos =
pendingIncrementalBR.values().toArray(
new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
pendingIncrementalBR.clear();
return blockInfos;
}
/**
* Add blocks from blockArray to pendingIncrementalBR, unless the
* block already exists in pendingIncrementalBR.
* @param blockArray list of blocks to add.
* @return the number of missing blocks that we added.
*/
int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
int blocksPut = 0;
for (ReceivedDeletedBlockInfo rdbi : blockArray) {
if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
++blocksPut;
}
}
return blocksPut;
}
/**
* Add pending incremental block report for a single block.
* @param blockInfo
*/
void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
}
/**
* Remove pending incremental block report for a single block if it
* exists.
*
* @param blockInfo
* @return true if a report was removed, false if no report existed for
* the given block.
*/
boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
}
}
void triggerBlockReport(BlockReportOptions options) throws IOException {
void triggerBlockReport(BlockReportOptions options) {
if (options.isIncremental()) {
LOG.info(bpos.toString() + ": scheduling an incremental block report.");
synchronized(pendingIncrementalBRperStorage) {
sendImmediateIBR = true;
pendingIncrementalBRperStorage.notifyAll();
}
ibrManager.triggerIBR();
} else {
LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(pendingIncrementalBRperStorage) {
synchronized(ibrManager) {
scheduler.forceFullBlockReportNow();
pendingIncrementalBRperStorage.notifyAll();
ibrManager.notifyAll();
}
}
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
* Manage Incremental Block Reports (IBRs).
*/
@InterfaceAudience.Private
class IncrementalBlockReportManager {
private static class PerStorageIBR {
/** The blocks in this IBR. */
final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
/**
* Remove the given block from this IBR
* @return true if the block was removed; otherwise, return false.
*/
ReceivedDeletedBlockInfo remove(Block block) {
return blocks.remove(block);
}
/** @return all the blocks removed from this IBR. */
ReceivedDeletedBlockInfo[] removeAll() {
final int size = blocks.size();
if (size == 0) {
return null;
}
final ReceivedDeletedBlockInfo[] rdbis = blocks.values().toArray(
new ReceivedDeletedBlockInfo[size]);
blocks.clear();
return rdbis;
}
/** Put the block to this IBR. */
void put(ReceivedDeletedBlockInfo rdbi) {
blocks.put(rdbi.getBlock(), rdbi);
}
/**
* Put the all blocks to this IBR unless the block already exists.
* @param rdbis list of blocks to add.
* @return the number of missing blocks added.
*/
int putMissing(ReceivedDeletedBlockInfo[] rdbis) {
int count = 0;
for (ReceivedDeletedBlockInfo rdbi : rdbis) {
if (!blocks.containsKey(rdbi.getBlock())) {
put(rdbi);
count++;
}
}
return count;
}
}
/**
* Between block reports (which happen on the order of once an hour) the
* DN reports smaller incremental changes to its block list for each storage.
* This map contains the pending changes not yet to be reported to the NN.
*/
private final Map<DatanodeStorage, PerStorageIBR> pendingIBRs
= Maps.newHashMap();
/**
* If this flag is set then an IBR will be sent immediately by the actor
* thread without waiting for the IBR timer to elapse.
*/
private volatile boolean readyToSend = false;
boolean sendImmediately() {
return readyToSend;
}
private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
final List<StorageReceivedDeletedBlocks> reports
= new ArrayList<>(pendingIBRs.size());
for (Map.Entry<DatanodeStorage, PerStorageIBR> entry
: pendingIBRs.entrySet()) {
final PerStorageIBR perStorage = entry.getValue();
// Send newly-received and deleted blockids to namenode
final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();
if (rdbi != null) {
reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
}
}
readyToSend = false;
return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);
}
private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
for (StorageReceivedDeletedBlocks r : reports) {
pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
}
if (reports.length > 0) {
readyToSend = true;
}
}
/** Send IBRs to namenode. */
void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
String bpid, DataNodeMetrics metrics) throws IOException {
// Generate a list of the pending reports for each storage under the lock
final StorageReceivedDeletedBlocks[] reports = generateIBRs();
if (reports.length == 0) {
// Nothing new to report.
return;
}
// Send incremental block reports to the Namenode outside the lock
boolean success = false;
final long startTime = monotonicNow();
try {
namenode.blockReceivedAndDeleted(registration, bpid, reports);
success = true;
} finally {
metrics.addIncrementalBlockReport(monotonicNow() - startTime);
if (!success) {
// If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime.
putMissing(reports);
}
}
}
/** @return the pending IBR for the given {@code storage} */
private PerStorageIBR getPerStorageIBR(DatanodeStorage storage) {
PerStorageIBR perStorage = pendingIBRs.get(storage);
if (perStorage == null) {
// This is the first time we are adding incremental BR state for
// this storage so create a new map. This is required once per
// storage, per service actor.
perStorage = new PerStorageIBR();
pendingIBRs.put(storage, perStorage);
}
return perStorage;
}
/**
* Add a block for notification to NameNode.
* If another entry exists for the same block it is removed.
*/
@VisibleForTesting
synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi,
DatanodeStorage storage) {
// Make sure another entry for the same block is first removed.
// There may only be one such entry.
for (PerStorageIBR perStorage : pendingIBRs.values()) {
if (perStorage.remove(rdbi.getBlock()) != null) {
break;
}
}
getPerStorageIBR(storage).put(rdbi);
}
synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
DatanodeStorage storage) {
addRDBI(rdbi, storage);
final BlockStatus status = rdbi.getStatus();
if (status == BlockStatus.RECEIVING_BLOCK) {
// the report will be sent out in the next heartbeat.
readyToSend = true;
} else if (status == BlockStatus.RECEIVED_BLOCK) {
// the report is sent right away.
triggerIBR();
}
}
synchronized void triggerIBR() {
readyToSend = true;
notifyAll();
}
@VisibleForTesting
synchronized void triggerDeletionReportForTests() {
triggerIBR();
while (sendImmediately()) {
try {
wait(100);
} catch (InterruptedException e) {
return;
}
}
}
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -55,7 +56,6 @@ public class TestIncrementalBlockReports {
private static final long DUMMY_BLOCK_GENSTAMP = 1000;
private MiniDFSCluster cluster = null;
private DistributedFileSystem fs;
private Configuration conf;
private NameNode singletonNn;
private DataNode singletonDn;
@ -67,7 +67,6 @@ public class TestIncrementalBlockReports {
public void startCluster() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build();
fs = cluster.getFileSystem();
singletonNn = cluster.getNameNode();
singletonDn = cluster.getDataNodes().get(0);
bpos = singletonDn.getAllBpOs().get(0);
@ -88,7 +87,8 @@ private static Block getDummyBlock() {
private void injectBlockReceived() {
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
actor.notifyNamenodeBlock(rdbi, storageUuid, true);
DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
actor.getIbrManager().notifyNamenodeBlock(rdbi, s);
}
/**
@ -97,7 +97,8 @@ private void injectBlockReceived() {
private void injectBlockDeleted() {
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
getDummyBlock(), BlockStatus.DELETED_BLOCK, null);
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
actor.getIbrManager().addRDBI(rdbi,
singletonDn.getFSDataset().getStorage(storageUuid));
}
/**
@ -206,7 +207,7 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException
any(StorageReceivedDeletedBlocks[].class));
// Ensure that no more IBRs are pending.
assertFalse(actor.hasPendingIBR());
assertFalse(actor.getIbrManager().sendImmediately());
} finally {
cluster.shutdown();

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -93,12 +94,14 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
DataNode datanode = cluster.getDataNodes().get(0);
BPServiceActor actor =
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
String storageUuid;
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
final DatanodeStorage storage;
try (FsDatasetSpi.FsVolumeReferences volumes =
datanode.getFSDataset().getFsVolumeReferences()) {
storageUuid = volumes.get(0).getStorageID();
dataset.getFsVolumeReferences()) {
storage = dataset.getStorage(volumes.get(0).getStorageID());
}
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
actor.getIbrManager().addRDBI(rdbi, storage);
// Manually trigger a block report.
datanode.triggerBlockReport(