Revert "HDFS-10301. Interleaving processing of storages from repeated block reports causes false zombie storage detection, removes valid blocks. Contributed by Vinitha Gankidi."
This reverts commit 9ab153d7ae
.
This commit is contained in:
parent
f39ed9ea8e
commit
9b3b7703b1
|
@ -63,34 +63,6 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
|
||||||
public Iterator<BlockReportReplica> iterator() {
|
public Iterator<BlockReportReplica> iterator() {
|
||||||
return Collections.emptyIterator();
|
return Collections.emptyIterator();
|
||||||
}
|
}
|
||||||
@Override
|
|
||||||
public boolean isStorageReport() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// STORAGE_REPORT is used to report all storages in the DN
|
|
||||||
public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() {
|
|
||||||
@Override
|
|
||||||
public int getNumberOfBlocks() {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public ByteString getBlocksBuffer() {
|
|
||||||
return ByteString.EMPTY;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public long[] getBlockListAsLongs() {
|
|
||||||
return EMPTY_LONGS;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public Iterator<BlockReportReplica> iterator() {
|
|
||||||
return Collections.emptyIterator();
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public boolean isStorageReport() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -280,13 +252,6 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
|
||||||
*/
|
*/
|
||||||
abstract public long[] getBlockListAsLongs();
|
abstract public long[] getBlockListAsLongs();
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true for STORAGE_REPORT BlocksListsAsLongs.
|
|
||||||
* Otherwise return false.
|
|
||||||
* @return boolean
|
|
||||||
*/
|
|
||||||
abstract public boolean isStorageReport();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a singleton iterator over blocks in the block report. Do not
|
* Returns a singleton iterator over blocks in the block report. Do not
|
||||||
* add the returned blocks to a collection.
|
* add the returned blocks to a collection.
|
||||||
|
@ -426,11 +391,6 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
|
||||||
return longs;
|
return longs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStorageReport() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<BlockReportReplica> iterator() {
|
public Iterator<BlockReportReplica> iterator() {
|
||||||
return new Iterator<BlockReportReplica>() {
|
return new Iterator<BlockReportReplica>() {
|
||||||
|
@ -514,11 +474,6 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
|
||||||
return longs;
|
return longs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStorageReport() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<BlockReportReplica> iterator() {
|
public Iterator<BlockReportReplica> iterator() {
|
||||||
return new Iterator<BlockReportReplica>() {
|
return new Iterator<BlockReportReplica>() {
|
||||||
|
|
|
@ -1886,8 +1886,8 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
*/
|
*/
|
||||||
public boolean processReport(final DatanodeID nodeID,
|
public boolean processReport(final DatanodeID nodeID,
|
||||||
final DatanodeStorage storage,
|
final DatanodeStorage storage,
|
||||||
final BlockListAsLongs newReport,
|
final BlockListAsLongs newReport, BlockReportContext context,
|
||||||
BlockReportContext context) throws IOException {
|
boolean lastStorageInRpc) throws IOException {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
||||||
final long endTime;
|
final long endTime;
|
||||||
|
@ -1937,14 +1937,30 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
storageInfo.receivedBlockReport();
|
storageInfo.receivedBlockReport();
|
||||||
if (context != null) {
|
if (context != null) {
|
||||||
if (context.getTotalRpcs() == context.getCurRpc() + 1) {
|
storageInfo.setLastBlockReportId(context.getReportId());
|
||||||
long leaseId = this.getBlockReportLeaseManager().removeLease(node);
|
if (lastStorageInRpc) {
|
||||||
|
int rpcsSeen = node.updateBlockReportContext(context);
|
||||||
|
if (rpcsSeen >= context.getTotalRpcs()) {
|
||||||
|
long leaseId = blockReportLeaseManager.removeLease(node);
|
||||||
BlockManagerFaultInjector.getInstance().
|
BlockManagerFaultInjector.getInstance().
|
||||||
removeBlockReportLease(node, leaseId);
|
removeBlockReportLease(node, leaseId);
|
||||||
|
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
|
||||||
|
if (zombies.isEmpty()) {
|
||||||
|
LOG.debug("processReport 0x{}: no zombie storages found.",
|
||||||
|
Long.toHexString(context.getReportId()));
|
||||||
|
} else {
|
||||||
|
for (DatanodeStorageInfo zombie : zombies) {
|
||||||
|
removeZombieReplicas(context, zombie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
node.clearBlockReportContext();
|
||||||
|
} else {
|
||||||
|
LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
|
||||||
|
"report.", Long.toHexString(context.getReportId()),
|
||||||
|
(context.getTotalRpcs() - rpcsSeen)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("Processing RPC with index {} out of total {} RPCs in "
|
|
||||||
+ "processReport 0x{}", context.getCurRpc(),
|
|
||||||
context.getTotalRpcs(), Long.toHexString(context.getReportId()));
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
endTime = Time.monotonicNow();
|
endTime = Time.monotonicNow();
|
||||||
|
@ -1970,26 +1986,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
return !node.hasStaleStorages();
|
return !node.hasStaleStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeZombieStorages(DatanodeRegistration nodeReg,
|
|
||||||
BlockReportContext context, Set<String> storageIDsInBlockReport)
|
|
||||||
throws UnregisteredNodeException {
|
|
||||||
namesystem.writeLock();
|
|
||||||
DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg);
|
|
||||||
if (node != null) {
|
|
||||||
List<DatanodeStorageInfo> zombies =
|
|
||||||
node.removeZombieStorages(storageIDsInBlockReport);
|
|
||||||
if (zombies.isEmpty()) {
|
|
||||||
LOG.debug("processReport 0x{}: no zombie storages found.",
|
|
||||||
Long.toHexString(context.getReportId()));
|
|
||||||
} else {
|
|
||||||
for (DatanodeStorageInfo zombie : zombies) {
|
|
||||||
this.removeZombieReplicas(context, zombie);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
namesystem.writeUnlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeZombieReplicas(BlockReportContext context,
|
private void removeZombieReplicas(BlockReportContext context,
|
||||||
DatanodeStorageInfo zombie) {
|
DatanodeStorageInfo zombie) {
|
||||||
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
|
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
|
||||||
|
|
|
@ -308,10 +308,10 @@ class BlockReportLeaseManager {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (node.leaseId == 0) {
|
if (node.leaseId == 0) {
|
||||||
LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " +
|
LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
|
||||||
"is not in the pending set.",
|
"is not in the pending set.",
|
||||||
Long.toHexString(id), dn.getDatanodeUuid());
|
Long.toHexString(id), dn.getDatanodeUuid());
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
if (pruneIfExpired(monotonicNowMs, node)) {
|
if (pruneIfExpired(monotonicNowMs, node)) {
|
||||||
LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
|
LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.BitSet;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
|
@ -151,6 +153,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
public final DecommissioningStatus decommissioningStatus =
|
public final DecommissioningStatus decommissioningStatus =
|
||||||
new DecommissioningStatus();
|
new DecommissioningStatus();
|
||||||
|
|
||||||
|
private long curBlockReportId = 0;
|
||||||
|
|
||||||
|
private BitSet curBlockReportRpcsSeen = null;
|
||||||
|
|
||||||
private final Map<String, DatanodeStorageInfo> storageMap =
|
private final Map<String, DatanodeStorageInfo> storageMap =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
|
@ -248,6 +253,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
|
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int updateBlockReportContext(BlockReportContext context) {
|
||||||
|
if (curBlockReportId != context.getReportId()) {
|
||||||
|
curBlockReportId = context.getReportId();
|
||||||
|
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
|
||||||
|
}
|
||||||
|
curBlockReportRpcsSeen.set(context.getCurRpc());
|
||||||
|
return curBlockReportRpcsSeen.cardinality();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearBlockReportContext() {
|
||||||
|
curBlockReportId = 0;
|
||||||
|
curBlockReportRpcsSeen = null;
|
||||||
|
}
|
||||||
|
|
||||||
public CachedBlocksList getPendingCached() {
|
public CachedBlocksList getPendingCached() {
|
||||||
return pendingCached;
|
return pendingCached;
|
||||||
}
|
}
|
||||||
|
@ -311,8 +330,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<DatanodeStorageInfo>
|
List<DatanodeStorageInfo> removeZombieStorages() {
|
||||||
removeZombieStorages(Set<String> storageIDsInBlockReport) {
|
|
||||||
List<DatanodeStorageInfo> zombies = null;
|
List<DatanodeStorageInfo> zombies = null;
|
||||||
synchronized (storageMap) {
|
synchronized (storageMap) {
|
||||||
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
|
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
|
||||||
|
@ -320,13 +338,18 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
|
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
|
||||||
DatanodeStorageInfo storageInfo = entry.getValue();
|
DatanodeStorageInfo storageInfo = entry.getValue();
|
||||||
if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) {
|
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
|
||||||
|
LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
|
||||||
|
storageInfo.getStorageID(),
|
||||||
|
Long.toHexString(storageInfo.getLastBlockReportId()),
|
||||||
|
Long.toHexString(curBlockReportId));
|
||||||
iter.remove();
|
iter.remove();
|
||||||
if (zombies == null) {
|
if (zombies == null) {
|
||||||
zombies = new LinkedList<>();
|
zombies = new LinkedList<>();
|
||||||
}
|
}
|
||||||
zombies.add(storageInfo);
|
zombies.add(storageInfo);
|
||||||
}
|
}
|
||||||
|
storageInfo.setLastBlockReportId(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
|
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
|
||||||
|
|
|
@ -122,6 +122,9 @@ public class DatanodeStorageInfo {
|
||||||
private volatile BlockInfo blockList = null;
|
private volatile BlockInfo blockList = null;
|
||||||
private int numBlocks = 0;
|
private int numBlocks = 0;
|
||||||
|
|
||||||
|
// The ID of the last full block report which updated this storage.
|
||||||
|
private long lastBlockReportId = 0;
|
||||||
|
|
||||||
/** The number of block reports received */
|
/** The number of block reports received */
|
||||||
private int blockReportCount = 0;
|
private int blockReportCount = 0;
|
||||||
|
|
||||||
|
@ -186,6 +189,14 @@ public class DatanodeStorageInfo {
|
||||||
this.blockPoolUsed = blockPoolUsed;
|
this.blockPoolUsed = blockPoolUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getLastBlockReportId() {
|
||||||
|
return lastBlockReportId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setLastBlockReportId(long lastBlockReportId) {
|
||||||
|
this.lastBlockReportId = lastBlockReportId;
|
||||||
|
}
|
||||||
|
|
||||||
State getState() {
|
State getState() {
|
||||||
return this.state;
|
return this.state;
|
||||||
}
|
}
|
||||||
|
|
|
@ -367,36 +367,11 @@ class BPServiceActor implements Runnable {
|
||||||
} else {
|
} else {
|
||||||
// Send one block report per message.
|
// Send one block report per message.
|
||||||
for (int r = 0; r < reports.length; r++) {
|
for (int r = 0; r < reports.length; r++) {
|
||||||
StorageBlockReport[] singleReport = {reports[r]};
|
StorageBlockReport singleReport[] = { reports[r] };
|
||||||
DatanodeCommand cmd;
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
||||||
if (r != reports.length - 1) {
|
bpRegistration, bpos.getBlockPoolId(), singleReport,
|
||||||
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(),
|
|
||||||
singleReport, new BlockReportContext(reports.length, r,
|
|
||||||
reportId, fullBrLeaseId));
|
|
||||||
} else {
|
|
||||||
StorageBlockReport[] lastSplitReport =
|
|
||||||
new StorageBlockReport[perVolumeBlockLists.size()];
|
|
||||||
// When block reports are split, the last RPC in the block report
|
|
||||||
// has the information about all storages in the block report.
|
|
||||||
// See HDFS-10301 for more details. To achieve this, the last RPC
|
|
||||||
// has 'n' storage reports, where 'n' is the number of storages in
|
|
||||||
// a DN. The actual block replicas are reported only for the
|
|
||||||
// last/n-th storage.
|
|
||||||
i = 0;
|
|
||||||
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
|
|
||||||
perVolumeBlockLists.entrySet()) {
|
|
||||||
lastSplitReport[i++] = new StorageBlockReport(
|
|
||||||
kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT);
|
|
||||||
if (i == r) {
|
|
||||||
lastSplitReport[i] = reports[r];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cmd = bpNamenode.blockReport(
|
|
||||||
bpRegistration, bpos.getBlockPoolId(), lastSplitReport,
|
|
||||||
new BlockReportContext(reports.length, r, reportId,
|
new BlockReportContext(reports.length, r, reportId,
|
||||||
fullBrLeaseId));
|
fullBrLeaseId));
|
||||||
}
|
|
||||||
numReportsSent++;
|
numReportsSent++;
|
||||||
numRPCs++;
|
numRPCs++;
|
||||||
if (cmd != null) {
|
if (cmd != null) {
|
||||||
|
|
|
@ -1415,7 +1415,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
boolean noStaleStorages = false;
|
boolean noStaleStorages = false;
|
||||||
for (int r = 0; r < reports.length; r++) {
|
for (int r = 0; r < reports.length; r++) {
|
||||||
final BlockListAsLongs blocks = reports[r].getBlocks();
|
final BlockListAsLongs blocks = reports[r].getBlocks();
|
||||||
if (!blocks.isStorageReport()) {
|
|
||||||
//
|
//
|
||||||
// BlockManager.processReport accumulates information of prior calls
|
// BlockManager.processReport accumulates information of prior calls
|
||||||
// for the same node and storage, so the value returned by the last
|
// for the same node and storage, so the value returned by the last
|
||||||
|
@ -1424,27 +1423,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
final int index = r;
|
final int index = r;
|
||||||
noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
|
noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public Boolean call()
|
public Boolean call() throws IOException {
|
||||||
throws IOException {
|
|
||||||
return bm.processReport(nodeReg, reports[index].getStorage(),
|
return bm.processReport(nodeReg, reports[index].getStorage(),
|
||||||
blocks, context);
|
blocks, context, (index == reports.length - 1));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
metrics.incrStorageBlockReportOps();
|
metrics.incrStorageBlockReportOps();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
BlockManagerFaultInjector.getInstance().
|
BlockManagerFaultInjector.getInstance().
|
||||||
incomingBlockReportRpc(nodeReg, context);
|
incomingBlockReportRpc(nodeReg, context);
|
||||||
|
|
||||||
if (nn.getFSImage().isUpgradeFinalized() &&
|
|
||||||
context.getTotalRpcs() == context.getCurRpc() + 1) {
|
|
||||||
Set<String> storageIDsInBlockReport = new HashSet<>();
|
|
||||||
for (StorageBlockReport report : reports) {
|
|
||||||
storageIDsInBlockReport.add(report.getStorage().getStorageID());
|
|
||||||
}
|
|
||||||
bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nn.getFSImage().isUpgradeFinalized() &&
|
if (nn.getFSImage().isUpgradeFinalized() &&
|
||||||
!namesystem.isRollingUpgrade() &&
|
!namesystem.isRollingUpgrade() &&
|
||||||
!nn.isStandbyState() &&
|
!nn.isStandbyState() &&
|
||||||
|
|
|
@ -710,12 +710,12 @@ public class TestBlockManager {
|
||||||
reset(node);
|
reset(node);
|
||||||
|
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY, null);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
// send block report again, should NOT be processed
|
// send block report again, should NOT be processed
|
||||||
reset(node);
|
reset(node);
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY, null);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
|
|
||||||
// re-register as if node restarted, should update existing node
|
// re-register as if node restarted, should update existing node
|
||||||
|
@ -726,7 +726,7 @@ public class TestBlockManager {
|
||||||
// send block report, should be processed after restart
|
// send block report, should be processed after restart
|
||||||
reset(node);
|
reset(node);
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY, null);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
// Reinitialize as registration with empty storage list pruned
|
// Reinitialize as registration with empty storage list pruned
|
||||||
// node.storageMap.
|
// node.storageMap.
|
||||||
ds = node.getStorageInfos()[0];
|
ds = node.getStorageInfos()[0];
|
||||||
|
@ -755,7 +755,7 @@ public class TestBlockManager {
|
||||||
reset(node);
|
reset(node);
|
||||||
doReturn(1).when(node).numBlocks();
|
doReturn(1).when(node).numBlocks();
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
BlockListAsLongs.EMPTY, null);
|
BlockListAsLongs.EMPTY, null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -828,7 +828,7 @@ public class TestBlockManager {
|
||||||
// Make sure it's the first full report
|
// Make sure it's the first full report
|
||||||
assertEquals(0, ds.getBlockReportCount());
|
assertEquals(0, ds.getBlockReportCount());
|
||||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||||
builder.build(), null);
|
builder.build(), null, false);
|
||||||
assertEquals(1, ds.getBlockReportCount());
|
assertEquals(1, ds.getBlockReportCount());
|
||||||
|
|
||||||
// verify the storage info is correct
|
// verify the storage info is correct
|
||||||
|
|
|
@ -19,40 +19,34 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -61,6 +55,8 @@ import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -369,67 +365,4 @@ public class TestNameNodePrunesMissingStorages {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testInterleavedFullBlockReports() throws Exception {
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
|
||||||
36000000L);
|
|
||||||
int numStoragesPerDatanode = 6;
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster
|
|
||||||
.Builder(conf).numDataNodes(1)
|
|
||||||
.storagesPerDatanode(numStoragesPerDatanode)
|
|
||||||
.build();
|
|
||||||
try {
|
|
||||||
LOG.info("waiting for cluster to become active...");
|
|
||||||
cluster.waitActive();
|
|
||||||
// Get the datanode registration and the block reports
|
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
|
||||||
final String blockPoolId = cluster.getNamesystem().getBlockPoolId();
|
|
||||||
LOG.info("Block pool id: " + blockPoolId);
|
|
||||||
final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId);
|
|
||||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
|
||||||
dn.getFSDataset().getBlockReports(blockPoolId);
|
|
||||||
final StorageBlockReport[] reports =
|
|
||||||
new StorageBlockReport[perVolumeBlockLists.size()];
|
|
||||||
int reportIndex = 0;
|
|
||||||
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
|
|
||||||
perVolumeBlockLists.entrySet()) {
|
|
||||||
DatanodeStorage dnStorage = kvPair.getKey();
|
|
||||||
BlockListAsLongs blockList = kvPair.getValue();
|
|
||||||
reports[reportIndex++] =
|
|
||||||
new StorageBlockReport(dnStorage, blockList);
|
|
||||||
}
|
|
||||||
// Get the list of storage ids associated with the datanode
|
|
||||||
// before the test
|
|
||||||
BlockManager bm =
|
|
||||||
cluster.getNameNode().getNamesystem().getBlockManager();
|
|
||||||
final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
|
|
||||||
getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
|
|
||||||
DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
|
|
||||||
// Send the full block report concurrently using
|
|
||||||
// numThreads=numStoragesPerDatanode
|
|
||||||
ExecutorService executorService = Executors.
|
|
||||||
newFixedThreadPool(numStoragesPerDatanode);
|
|
||||||
List<Future<DatanodeCommand>> futureList =
|
|
||||||
new ArrayList<>(numStoragesPerDatanode);
|
|
||||||
for (int i = 0; i < numStoragesPerDatanode; i++) {
|
|
||||||
futureList.add(executorService.submit(new Callable<DatanodeCommand>() {
|
|
||||||
@Override
|
|
||||||
public DatanodeCommand call() throws IOException {
|
|
||||||
return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId,
|
|
||||||
reports, new BlockReportContext(1, 0, System.nanoTime(), 0L));
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
for (Future<DatanodeCommand> future: futureList) {
|
|
||||||
future.get();
|
|
||||||
}
|
|
||||||
executorService.shutdown();
|
|
||||||
// Verify that the storages match before and after the test
|
|
||||||
Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
|
|
||||||
} finally {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.mockito.Matchers.*;
|
import static org.mockito.Matchers.*;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -89,34 +88,6 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
|
blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyCapturedArgumentsSplit(
|
|
||||||
ArgumentCaptor<StorageBlockReport[]> captor,
|
|
||||||
int expectedReportsPerCall,
|
|
||||||
int expectedTotalBlockCount) {
|
|
||||||
List<StorageBlockReport[]> listOfReports = captor.getAllValues();
|
|
||||||
int numBlocksReported = 0;
|
|
||||||
int storageIndex = 0;
|
|
||||||
int listOfReportsSize = listOfReports.size();
|
|
||||||
for (StorageBlockReport[] reports : listOfReports) {
|
|
||||||
if (storageIndex < (listOfReportsSize - 1)) {
|
|
||||||
assertThat(reports.length, is(expectedReportsPerCall));
|
|
||||||
} else {
|
|
||||||
assertThat(reports.length, is(listOfReportsSize));
|
|
||||||
}
|
|
||||||
for (StorageBlockReport report : reports) {
|
|
||||||
BlockListAsLongs blockList = report.getBlocks();
|
|
||||||
if (!blockList.isStorageReport()) {
|
|
||||||
numBlocksReported += blockList.getNumberOfBlocks();
|
|
||||||
} else {
|
|
||||||
assertEquals(blockList.getNumberOfBlocks(), -1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
storageIndex++;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numBlocksReported >= expectedTotalBlockCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyCapturedArguments(
|
private void verifyCapturedArguments(
|
||||||
ArgumentCaptor<StorageBlockReport[]> captor,
|
ArgumentCaptor<StorageBlockReport[]> captor,
|
||||||
int expectedReportsPerCall,
|
int expectedReportsPerCall,
|
||||||
|
@ -165,7 +136,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
|
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -229,7 +200,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
captor.capture(), Mockito.<BlockReportContext>anyObject());
|
||||||
|
|
||||||
verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
|
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
@ -36,32 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
|
||||||
@Override
|
@Override
|
||||||
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
||||||
StorageBlockReport[] reports) throws IOException {
|
StorageBlockReport[] reports) throws IOException {
|
||||||
for (int r = 0; r < reports.length; r++) {
|
int i = 0;
|
||||||
LOG.info("Sending block report for storage " +
|
for (StorageBlockReport report : reports) {
|
||||||
reports[r].getStorage().getStorageID());
|
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
|
||||||
StorageBlockReport[] singletonReport = {reports[r]};
|
StorageBlockReport[] singletonReport = { report };
|
||||||
if (r != reports.length - 1) {
|
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
|
||||||
new BlockReportContext(reports.length, r, System.nanoTime(),
|
new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
|
||||||
0L));
|
i++;
|
||||||
} else {
|
|
||||||
StorageBlockReport[] lastSplitReport =
|
|
||||||
new StorageBlockReport[reports.length];
|
|
||||||
// When block reports are split, send a dummy storage report for all
|
|
||||||
// other storages in the blockreport along with the last storage report
|
|
||||||
for (int i = 0; i <= r; i++) {
|
|
||||||
if (i == r) {
|
|
||||||
lastSplitReport[i] = reports[r];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
lastSplitReport[i] =
|
|
||||||
new StorageBlockReport(reports[i].getStorage(),
|
|
||||||
BlockListAsLongs.STORAGE_REPORT);
|
|
||||||
}
|
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport,
|
|
||||||
new BlockReportContext(reports.length, r, System.nanoTime(),
|
|
||||||
0L));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue