HDFS-9726. Refactor IBR code to a new class.
This commit is contained in:
parent
1bcfab8e7f
commit
4e5e1c0f99
|
@ -1952,6 +1952,8 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-9715. Check storage ID uniqueness on datanode startup
|
||||
(Lei (Eddy) Xu via vinayakumarb)
|
||||
|
||||
HDFS-9726. Refactor IBR code to a new class. (szetszwo)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
||||
|
|
|
@ -235,14 +235,27 @@ class BPOfferService {
|
|||
*/
|
||||
void notifyNamenodeReceivedBlock(
|
||||
ExtendedBlock block, String delHint, String storageUuid) {
|
||||
notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
|
||||
storageUuid);
|
||||
}
|
||||
|
||||
void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
|
||||
notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid);
|
||||
}
|
||||
|
||||
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
|
||||
notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid);
|
||||
}
|
||||
|
||||
private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
|
||||
String delHint, String storageUuid) {
|
||||
checkBlock(block);
|
||||
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
|
||||
block.getLocalBlock(),
|
||||
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
||||
delHint);
|
||||
final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
|
||||
block.getLocalBlock(), status, delHint);
|
||||
final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
|
||||
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
actor.notifyNamenodeBlock(bInfo, storageUuid, true);
|
||||
actor.getIbrManager().notifyNamenodeBlock(info, storage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -254,26 +267,6 @@ class BPOfferService {
|
|||
block.getBlockPoolId(), getBlockPoolId());
|
||||
}
|
||||
|
||||
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
|
||||
checkBlock(block);
|
||||
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
|
||||
block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
|
||||
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
|
||||
}
|
||||
}
|
||||
|
||||
void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
|
||||
checkBlock(block);
|
||||
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
|
||||
block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
|
||||
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
actor.notifyNamenodeBlock(bInfo, storageUuid, false);
|
||||
}
|
||||
}
|
||||
|
||||
//This must be called only by blockPoolManager
|
||||
void start() {
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
|
@ -578,7 +571,7 @@ class BPOfferService {
|
|||
@VisibleForTesting
|
||||
void triggerDeletionReportForTests() throws IOException {
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
actor.triggerDeletionReportForTests();
|
||||
actor.getIbrManager().triggerDeletionReportForTests();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.Map;
|
|||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
|
@ -51,9 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -61,10 +58,10 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.slf4j.Logger;
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
/**
|
||||
* A thread per active or standby namenode to perform:
|
||||
|
@ -95,25 +92,14 @@ class BPServiceActor implements Runnable {
|
|||
}
|
||||
|
||||
private volatile RunningState runningState = RunningState.CONNECTING;
|
||||
|
||||
/**
|
||||
* Between block reports (which happen on the order of once an hour) the
|
||||
* DN reports smaller incremental changes to its block list. This map,
|
||||
* keyed by block ID, contains the pending changes which have yet to be
|
||||
* reported to the NN. Access should be synchronized on this object.
|
||||
*/
|
||||
private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
|
||||
pendingIncrementalBRperStorage = Maps.newHashMap();
|
||||
|
||||
// IBR = Incremental Block Report. If this flag is set then an IBR will be
|
||||
// sent immediately by the actor thread without waiting for the IBR timer
|
||||
// to elapse.
|
||||
private volatile boolean sendImmediateIBR = false;
|
||||
private volatile boolean shouldServiceRun = true;
|
||||
private final DataNode dn;
|
||||
private final DNConf dnConf;
|
||||
private long prevBlockReportId;
|
||||
|
||||
private final IncrementalBlockReportManager ibrManager
|
||||
= new IncrementalBlockReportManager();
|
||||
|
||||
private DatanodeRegistration bpRegistration;
|
||||
final LinkedList<BPServiceActorAction> bpThreadQueue
|
||||
= new LinkedList<BPServiceActorAction>();
|
||||
|
@ -131,6 +117,10 @@ class BPServiceActor implements Runnable {
|
|||
return bpRegistration;
|
||||
}
|
||||
|
||||
IncrementalBlockReportManager getIbrManager() {
|
||||
return ibrManager;
|
||||
}
|
||||
|
||||
boolean isAlive() {
|
||||
if (!shouldServiceRun || !bpThread.isAlive()) {
|
||||
return false;
|
||||
|
@ -231,141 +221,20 @@ class BPServiceActor implements Runnable {
|
|||
register(nsInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Report received blocks and delete hints to the Namenode for each
|
||||
* storage.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void reportReceivedDeletedBlocks() throws IOException {
|
||||
|
||||
// Generate a list of the pending reports for each storage under the lock
|
||||
ArrayList<StorageReceivedDeletedBlocks> reports =
|
||||
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
|
||||
pendingIncrementalBRperStorage.entrySet()) {
|
||||
final DatanodeStorage storage = entry.getKey();
|
||||
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
|
||||
|
||||
if (perStorageMap.getBlockInfoCount() > 0) {
|
||||
// Send newly-received and deleted blockids to namenode
|
||||
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
|
||||
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
|
||||
}
|
||||
}
|
||||
sendImmediateIBR = false;
|
||||
}
|
||||
|
||||
if (reports.size() == 0) {
|
||||
// Nothing new to report.
|
||||
return;
|
||||
}
|
||||
|
||||
// Send incremental block reports to the Namenode outside the lock
|
||||
boolean success = false;
|
||||
final long startTime = monotonicNow();
|
||||
try {
|
||||
bpNamenode.blockReceivedAndDeleted(bpRegistration,
|
||||
bpos.getBlockPoolId(),
|
||||
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
|
||||
success = true;
|
||||
} finally {
|
||||
dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime);
|
||||
if (!success) {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
for (StorageReceivedDeletedBlocks report : reports) {
|
||||
// If we didn't succeed in sending the report, put all of the
|
||||
// blocks back onto our queue, but only in the case where we
|
||||
// didn't put something newer in the meantime.
|
||||
PerStoragePendingIncrementalBR perStorageMap =
|
||||
pendingIncrementalBRperStorage.get(report.getStorage());
|
||||
perStorageMap.putMissingBlockInfos(report.getBlocks());
|
||||
sendImmediateIBR = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return pending incremental block report for given {@code storage}
|
||||
*/
|
||||
private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
|
||||
DatanodeStorage storage) {
|
||||
PerStoragePendingIncrementalBR mapForStorage =
|
||||
pendingIncrementalBRperStorage.get(storage);
|
||||
|
||||
if (mapForStorage == null) {
|
||||
// This is the first time we are adding incremental BR state for
|
||||
// this storage so create a new map. This is required once per
|
||||
// storage, per service actor.
|
||||
mapForStorage = new PerStoragePendingIncrementalBR();
|
||||
pendingIncrementalBRperStorage.put(storage, mapForStorage);
|
||||
}
|
||||
|
||||
return mapForStorage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a blockInfo for notification to NameNode. If another entry
|
||||
* exists for the same block it is removed.
|
||||
*
|
||||
* Caller must synchronize access using pendingIncrementalBRperStorage.
|
||||
*/
|
||||
void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
|
||||
DatanodeStorage storage) {
|
||||
// Make sure another entry for the same block is first removed.
|
||||
// There may only be one such entry.
|
||||
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
|
||||
pendingIncrementalBRperStorage.entrySet()) {
|
||||
if (entry.getValue().removeBlockInfo(bInfo)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* Informing the name node could take a long long time! Should we wait
|
||||
* till namenode is informed before responding with success to the
|
||||
* client? For now we don't.
|
||||
*/
|
||||
void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo,
|
||||
String storageUuid, boolean now) {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
addPendingReplicationBlockInfo(
|
||||
bInfo, dn.getFSDataset().getStorage(storageUuid));
|
||||
sendImmediateIBR = true;
|
||||
// If now is true, the report is sent right away.
|
||||
// Otherwise, it will be sent out in the next heartbeat.
|
||||
if (now) {
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void notifyNamenodeDeletedBlock(
|
||||
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
addPendingReplicationBlockInfo(
|
||||
bInfo, dn.getFSDataset().getStorage(storageUuid));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run an immediate block report on this thread. Used by tests.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void triggerBlockReportForTests() {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
synchronized (ibrManager) {
|
||||
scheduler.scheduleHeartbeat();
|
||||
long oldBlockReportTime = scheduler.nextBlockReportTime;
|
||||
scheduler.forceFullBlockReportNow();
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
ibrManager.notifyAll();
|
||||
while (oldBlockReportTime == scheduler.nextBlockReportTime) {
|
||||
try {
|
||||
pendingIncrementalBRperStorage.wait(100);
|
||||
ibrManager.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
|
@ -375,12 +244,12 @@ class BPServiceActor implements Runnable {
|
|||
|
||||
@VisibleForTesting
|
||||
void triggerHeartbeatForTests() {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
synchronized (ibrManager) {
|
||||
final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
ibrManager.notifyAll();
|
||||
while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
|
||||
try {
|
||||
pendingIncrementalBRperStorage.wait(100);
|
||||
ibrManager.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
|
@ -388,27 +257,6 @@ class BPServiceActor implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void triggerDeletionReportForTests() {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
sendImmediateIBR = true;
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
|
||||
while (sendImmediateIBR) {
|
||||
try {
|
||||
pendingIncrementalBRperStorage.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean hasPendingIBR() {
|
||||
return sendImmediateIBR;
|
||||
}
|
||||
|
||||
private long generateUniqueBlockReportId() {
|
||||
// Initialize the block report ID the first time through.
|
||||
// Note that 0 is used on the NN to indicate "uninitialized", so we should
|
||||
|
@ -432,7 +280,8 @@ class BPServiceActor implements Runnable {
|
|||
// we have a chance that we will miss the delHint information
|
||||
// or we will report an RBW replica after the BlockReport already reports
|
||||
// a FINALIZED one.
|
||||
reportReceivedDeletedBlocks();
|
||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||
bpos.getBlockPoolId(), dn.getMetrics());
|
||||
|
||||
long brCreateStartTime = monotonicNow();
|
||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
||||
|
@ -697,8 +546,9 @@ class BPServiceActor implements Runnable {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (sendImmediateIBR || sendHeartbeat) {
|
||||
reportReceivedDeletedBlocks();
|
||||
if (ibrManager.sendImmediately() || sendHeartbeat) {
|
||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||
bpos.getBlockPoolId(), dn.getMetrics());
|
||||
}
|
||||
|
||||
List<DatanodeCommand> cmds = null;
|
||||
|
@ -723,10 +573,10 @@ class BPServiceActor implements Runnable {
|
|||
// or work arrives, and then iterate again.
|
||||
//
|
||||
long waitTime = scheduler.getHeartbeatWaitTime();
|
||||
synchronized(pendingIncrementalBRperStorage) {
|
||||
if (waitTime > 0 && !sendImmediateIBR) {
|
||||
synchronized(ibrManager) {
|
||||
if (waitTime > 0 && !ibrManager.sendImmediately()) {
|
||||
try {
|
||||
pendingIncrementalBRperStorage.wait(waitTime);
|
||||
ibrManager.wait(waitTime);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("BPOfferService for " + this + " interrupted");
|
||||
}
|
||||
|
@ -915,82 +765,15 @@ class BPServiceActor implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
private static class PerStoragePendingIncrementalBR {
|
||||
private final Map<Long, ReceivedDeletedBlockInfo> pendingIncrementalBR =
|
||||
Maps.newHashMap();
|
||||
|
||||
/**
|
||||
* Return the number of blocks on this storage that have pending
|
||||
* incremental block reports.
|
||||
* @return
|
||||
*/
|
||||
int getBlockInfoCount() {
|
||||
return pendingIncrementalBR.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Dequeue and return all pending incremental block report state.
|
||||
* @return
|
||||
*/
|
||||
ReceivedDeletedBlockInfo[] dequeueBlockInfos() {
|
||||
ReceivedDeletedBlockInfo[] blockInfos =
|
||||
pendingIncrementalBR.values().toArray(
|
||||
new ReceivedDeletedBlockInfo[getBlockInfoCount()]);
|
||||
|
||||
pendingIncrementalBR.clear();
|
||||
return blockInfos;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add blocks from blockArray to pendingIncrementalBR, unless the
|
||||
* block already exists in pendingIncrementalBR.
|
||||
* @param blockArray list of blocks to add.
|
||||
* @return the number of missing blocks that we added.
|
||||
*/
|
||||
int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) {
|
||||
int blocksPut = 0;
|
||||
for (ReceivedDeletedBlockInfo rdbi : blockArray) {
|
||||
if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) {
|
||||
pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi);
|
||||
++blocksPut;
|
||||
}
|
||||
}
|
||||
return blocksPut;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add pending incremental block report for a single block.
|
||||
* @param blockInfo
|
||||
*/
|
||||
void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
|
||||
pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove pending incremental block report for a single block if it
|
||||
* exists.
|
||||
*
|
||||
* @param blockInfo
|
||||
* @return true if a report was removed, false if no report existed for
|
||||
* the given block.
|
||||
*/
|
||||
boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
|
||||
return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
|
||||
}
|
||||
}
|
||||
|
||||
void triggerBlockReport(BlockReportOptions options) throws IOException {
|
||||
void triggerBlockReport(BlockReportOptions options) {
|
||||
if (options.isIncremental()) {
|
||||
LOG.info(bpos.toString() + ": scheduling an incremental block report.");
|
||||
synchronized(pendingIncrementalBRperStorage) {
|
||||
sendImmediateIBR = true;
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
}
|
||||
ibrManager.triggerIBR();
|
||||
} else {
|
||||
LOG.info(bpos.toString() + ": scheduling a full block report.");
|
||||
synchronized(pendingIncrementalBRperStorage) {
|
||||
synchronized(ibrManager) {
|
||||
scheduler.forceFullBlockReportNow();
|
||||
pendingIncrementalBRperStorage.notifyAll();
|
||||
ibrManager.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Manage Incremental Block Reports (IBRs).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class IncrementalBlockReportManager {
|
||||
private static class PerStorageIBR {
|
||||
/** The blocks in this IBR. */
|
||||
final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
|
||||
|
||||
/**
|
||||
* Remove the given block from this IBR
|
||||
* @return true if the block was removed; otherwise, return false.
|
||||
*/
|
||||
ReceivedDeletedBlockInfo remove(Block block) {
|
||||
return blocks.remove(block);
|
||||
}
|
||||
|
||||
/** @return all the blocks removed from this IBR. */
|
||||
ReceivedDeletedBlockInfo[] removeAll() {
|
||||
final int size = blocks.size();
|
||||
if (size == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final ReceivedDeletedBlockInfo[] rdbis = blocks.values().toArray(
|
||||
new ReceivedDeletedBlockInfo[size]);
|
||||
blocks.clear();
|
||||
return rdbis;
|
||||
}
|
||||
|
||||
/** Put the block to this IBR. */
|
||||
void put(ReceivedDeletedBlockInfo rdbi) {
|
||||
blocks.put(rdbi.getBlock(), rdbi);
|
||||
}
|
||||
|
||||
/**
|
||||
* Put the all blocks to this IBR unless the block already exists.
|
||||
* @param rdbis list of blocks to add.
|
||||
* @return the number of missing blocks added.
|
||||
*/
|
||||
int putMissing(ReceivedDeletedBlockInfo[] rdbis) {
|
||||
int count = 0;
|
||||
for (ReceivedDeletedBlockInfo rdbi : rdbis) {
|
||||
if (!blocks.containsKey(rdbi.getBlock())) {
|
||||
put(rdbi);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Between block reports (which happen on the order of once an hour) the
|
||||
* DN reports smaller incremental changes to its block list for each storage.
|
||||
* This map contains the pending changes not yet to be reported to the NN.
|
||||
*/
|
||||
private final Map<DatanodeStorage, PerStorageIBR> pendingIBRs
|
||||
= Maps.newHashMap();
|
||||
|
||||
/**
|
||||
* If this flag is set then an IBR will be sent immediately by the actor
|
||||
* thread without waiting for the IBR timer to elapse.
|
||||
*/
|
||||
private volatile boolean readyToSend = false;
|
||||
|
||||
boolean sendImmediately() {
|
||||
return readyToSend;
|
||||
}
|
||||
|
||||
private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
|
||||
final List<StorageReceivedDeletedBlocks> reports
|
||||
= new ArrayList<>(pendingIBRs.size());
|
||||
for (Map.Entry<DatanodeStorage, PerStorageIBR> entry
|
||||
: pendingIBRs.entrySet()) {
|
||||
final PerStorageIBR perStorage = entry.getValue();
|
||||
|
||||
// Send newly-received and deleted blockids to namenode
|
||||
final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();
|
||||
if (rdbi != null) {
|
||||
reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
|
||||
}
|
||||
}
|
||||
readyToSend = false;
|
||||
return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);
|
||||
}
|
||||
|
||||
private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
|
||||
for (StorageReceivedDeletedBlocks r : reports) {
|
||||
pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
|
||||
}
|
||||
if (reports.length > 0) {
|
||||
readyToSend = true;
|
||||
}
|
||||
}
|
||||
|
||||
/** Send IBRs to namenode. */
|
||||
void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
|
||||
String bpid, DataNodeMetrics metrics) throws IOException {
|
||||
// Generate a list of the pending reports for each storage under the lock
|
||||
final StorageReceivedDeletedBlocks[] reports = generateIBRs();
|
||||
if (reports.length == 0) {
|
||||
// Nothing new to report.
|
||||
return;
|
||||
}
|
||||
|
||||
// Send incremental block reports to the Namenode outside the lock
|
||||
boolean success = false;
|
||||
final long startTime = monotonicNow();
|
||||
try {
|
||||
namenode.blockReceivedAndDeleted(registration, bpid, reports);
|
||||
success = true;
|
||||
} finally {
|
||||
metrics.addIncrementalBlockReport(monotonicNow() - startTime);
|
||||
if (!success) {
|
||||
// If we didn't succeed in sending the report, put all of the
|
||||
// blocks back onto our queue, but only in the case where we
|
||||
// didn't put something newer in the meantime.
|
||||
putMissing(reports);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @return the pending IBR for the given {@code storage} */
|
||||
private PerStorageIBR getPerStorageIBR(DatanodeStorage storage) {
|
||||
PerStorageIBR perStorage = pendingIBRs.get(storage);
|
||||
if (perStorage == null) {
|
||||
// This is the first time we are adding incremental BR state for
|
||||
// this storage so create a new map. This is required once per
|
||||
// storage, per service actor.
|
||||
perStorage = new PerStorageIBR();
|
||||
pendingIBRs.put(storage, perStorage);
|
||||
}
|
||||
return perStorage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a block for notification to NameNode.
|
||||
* If another entry exists for the same block it is removed.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi,
|
||||
DatanodeStorage storage) {
|
||||
// Make sure another entry for the same block is first removed.
|
||||
// There may only be one such entry.
|
||||
for (PerStorageIBR perStorage : pendingIBRs.values()) {
|
||||
if (perStorage.remove(rdbi.getBlock()) != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
getPerStorageIBR(storage).put(rdbi);
|
||||
}
|
||||
|
||||
synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
|
||||
DatanodeStorage storage) {
|
||||
addRDBI(rdbi, storage);
|
||||
|
||||
final BlockStatus status = rdbi.getStatus();
|
||||
if (status == BlockStatus.RECEIVING_BLOCK) {
|
||||
// the report will be sent out in the next heartbeat.
|
||||
readyToSend = true;
|
||||
} else if (status == BlockStatus.RECEIVED_BLOCK) {
|
||||
// the report is sent right away.
|
||||
triggerIBR();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void triggerIBR() {
|
||||
readyToSend = true;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized void triggerDeletionReportForTests() {
|
||||
triggerIBR();
|
||||
|
||||
while (sendImmediately()) {
|
||||
try {
|
||||
wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
|
||||
|
@ -55,7 +56,6 @@ public class TestIncrementalBlockReports {
|
|||
private static final long DUMMY_BLOCK_GENSTAMP = 1000;
|
||||
|
||||
private MiniDFSCluster cluster = null;
|
||||
private DistributedFileSystem fs;
|
||||
private Configuration conf;
|
||||
private NameNode singletonNn;
|
||||
private DataNode singletonDn;
|
||||
|
@ -67,7 +67,6 @@ public class TestIncrementalBlockReports {
|
|||
public void startCluster() throws IOException {
|
||||
conf = new HdfsConfiguration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build();
|
||||
fs = cluster.getFileSystem();
|
||||
singletonNn = cluster.getNameNode();
|
||||
singletonDn = cluster.getDataNodes().get(0);
|
||||
bpos = singletonDn.getAllBpOs().get(0);
|
||||
|
@ -88,7 +87,8 @@ public class TestIncrementalBlockReports {
|
|||
private void injectBlockReceived() {
|
||||
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
|
||||
getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
|
||||
actor.notifyNamenodeBlock(rdbi, storageUuid, true);
|
||||
DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
|
||||
actor.getIbrManager().notifyNamenodeBlock(rdbi, s);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,7 +97,8 @@ public class TestIncrementalBlockReports {
|
|||
private void injectBlockDeleted() {
|
||||
ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
|
||||
getDummyBlock(), BlockStatus.DELETED_BLOCK, null);
|
||||
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
|
||||
actor.getIbrManager().addRDBI(rdbi,
|
||||
singletonDn.getFSDataset().getStorage(storageUuid));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -206,7 +207,7 @@ public class TestIncrementalBlockReports {
|
|||
any(StorageReceivedDeletedBlocks[].class));
|
||||
|
||||
// Ensure that no more IBRs are pending.
|
||||
assertFalse(actor.hasPendingIBR());
|
||||
assertFalse(actor.getIbrManager().sendImmediately());
|
||||
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
|
@ -94,12 +95,14 @@ public final class TestTriggerBlockReport {
|
|||
DataNode datanode = cluster.getDataNodes().get(0);
|
||||
BPServiceActor actor =
|
||||
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
|
||||
String storageUuid;
|
||||
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
|
||||
final DatanodeStorage storage;
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
datanode.getFSDataset().getFsVolumeReferences()) {
|
||||
storageUuid = volumes.get(0).getStorageID();
|
||||
dataset.getFsVolumeReferences()) {
|
||||
storage = dataset.getStorage(volumes.get(0).getStorageID());
|
||||
}
|
||||
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
|
||||
|
||||
actor.getIbrManager().addRDBI(rdbi, storage);
|
||||
|
||||
// Manually trigger a block report.
|
||||
datanode.triggerBlockReport(
|
||||
|
|
Loading…
Reference in New Issue