HDFS-16333. fix balancer bug when transfer an EC block (#3679)
(cherry picked from commit 35556ea043
)
This commit is contained in:
parent
1ee661d7da
commit
55c0e676fd
|
@ -491,7 +491,7 @@ public class Dispatcher {
|
|||
|
||||
public static class DBlockStriped extends DBlock {
|
||||
|
||||
final byte[] indices;
|
||||
private byte[] indices;
|
||||
final short dataBlockNum;
|
||||
final int cellSize;
|
||||
|
||||
|
@ -528,6 +528,29 @@ public class Dispatcher {
|
|||
}
|
||||
return block.getNumBytes();
|
||||
}
|
||||
|
||||
public void setIndices(byte[] indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjust EC block indices,it will remove the element of adjustList from indices.
|
||||
* @param adjustList the list will be removed from indices
|
||||
*/
|
||||
public void adjustIndices(List<Integer> adjustList) {
|
||||
if (adjustList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] newIndices = new byte[indices.length - adjustList.size()];
|
||||
for (int i = 0, j = 0; i < indices.length; ++i) {
|
||||
if (!adjustList.contains(i)) {
|
||||
newIndices[j] = indices[i];
|
||||
++j;
|
||||
}
|
||||
}
|
||||
this.indices = newIndices;
|
||||
}
|
||||
}
|
||||
|
||||
/** The class represents a desired move. */
|
||||
|
@ -804,7 +827,7 @@ public class Dispatcher {
|
|||
*
|
||||
* @return the total size of the received blocks in the number of bytes.
|
||||
*/
|
||||
private long getBlockList() throws IOException {
|
||||
private long getBlockList() throws IOException, IllegalArgumentException {
|
||||
final long size = Math.min(getBlocksSize, blocksToReceive);
|
||||
final BlocksWithLocations newBlksLocs =
|
||||
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
|
||||
|
@ -841,7 +864,14 @@ public class Dispatcher {
|
|||
synchronized (block) {
|
||||
block.clearLocations();
|
||||
|
||||
if (blkLocs instanceof StripedBlockWithLocations) {
|
||||
// EC block may adjust indices before, avoid repeated adjustments
|
||||
((DBlockStriped) block).setIndices(
|
||||
((StripedBlockWithLocations) blkLocs).getIndices());
|
||||
}
|
||||
|
||||
// update locations
|
||||
List<Integer> adjustList = new ArrayList<>();
|
||||
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
|
||||
final StorageType[] storageTypes = blkLocs.getStorageTypes();
|
||||
for (int i = 0; i < datanodeUuids.length; i++) {
|
||||
|
@ -849,8 +879,20 @@ public class Dispatcher {
|
|||
datanodeUuids[i], storageTypes[i]);
|
||||
if (g != null) { // not unknown
|
||||
block.addLocation(g);
|
||||
} else if (blkLocs instanceof StripedBlockWithLocations) {
|
||||
// some datanode may not in storageGroupMap due to decommission operation
|
||||
// or balancer cli with "-exclude" parameter
|
||||
adjustList.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
if (!adjustList.isEmpty()) {
|
||||
// block.locations mismatch with block.indices
|
||||
// adjust indices to get correct internalBlock for Datanode in #getInternalBlock
|
||||
((DBlockStriped) block).adjustIndices(adjustList);
|
||||
Preconditions.checkArgument(((DBlockStriped) block).indices.length
|
||||
== block.locations.size());
|
||||
}
|
||||
}
|
||||
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -970,7 +1012,7 @@ public class Dispatcher {
|
|||
}
|
||||
blocksToReceive -= received;
|
||||
continue;
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | IllegalArgumentException e) {
|
||||
LOG.warn("Exception while getting reportedBlock list", e);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -468,6 +468,19 @@ public class TestBalancer {
|
|||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
||||
int expectedExcludedNodes) throws IOException, TimeoutException {
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until balanced: each datanode gives utilization within.
|
||||
* BALANCE_ALLOWED_VARIANCE of average
|
||||
* @throws IOException
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
||||
int expectedExcludedNodes, boolean checkExcludeNodesUtilization)
|
||||
throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
: Time.monotonicNow() + timeout;
|
||||
|
@ -489,7 +502,9 @@ public class TestBalancer {
|
|||
double nodeUtilization = ((double)datanode.getDfsUsed())
|
||||
/ datanode.getCapacity();
|
||||
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
|
||||
if (checkExcludeNodesUtilization) {
|
||||
assertTrue(nodeUtilization == 0);
|
||||
}
|
||||
actualExcludedNodeCount++;
|
||||
continue;
|
||||
}
|
||||
|
@ -774,6 +789,12 @@ public class TestBalancer {
|
|||
private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||
long totalCapacity, BalancerParameters p, int excludedNodes)
|
||||
throws Exception {
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
|
||||
}
|
||||
|
||||
private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||
long totalCapacity, BalancerParameters p, int excludedNodes,
|
||||
boolean checkExcludeNodesUtilization) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
|
||||
int retry = 5;
|
||||
|
@ -794,7 +815,7 @@ public class TestBalancer {
|
|||
LOG.info(" .");
|
||||
try {
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
||||
excludedNodes);
|
||||
excludedNodes, checkExcludeNodesUtilization);
|
||||
} catch (TimeoutException e) {
|
||||
// See HDFS-11682. NN may not get heartbeat to reflect the newest
|
||||
// block changes.
|
||||
|
@ -1628,6 +1649,103 @@ public class TestBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBalancerWithExcludeListWithStripedFile() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
initConfWithStripe(conf);
|
||||
NameNodeConnector.setWrite2IdFile(true);
|
||||
doTestBalancerWithExcludeListWithStripedFile(conf);
|
||||
NameNodeConnector.setWrite2IdFile(false);
|
||||
}
|
||||
|
||||
private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception {
|
||||
int numOfDatanodes = dataBlocks + parityBlocks + 5;
|
||||
int numOfRacks = dataBlocks;
|
||||
long capacity = 20 * defaultBlockSize;
|
||||
long[] capacities = new long[numOfDatanodes];
|
||||
Arrays.fill(capacities, capacity);
|
||||
String[] racks = new String[numOfDatanodes];
|
||||
for (int i = 0; i < numOfDatanodes; i++) {
|
||||
racks[i] = "/rack" + (i % numOfRacks);
|
||||
}
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(numOfDatanodes)
|
||||
.racks(racks)
|
||||
.simulatedCapacities(capacities)
|
||||
.build();
|
||||
|
||||
try {
|
||||
cluster.waitActive();
|
||||
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
|
||||
ClientProtocol.class).getProxy();
|
||||
client.enableErasureCodingPolicy(
|
||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||
client.setErasureCodingPolicy("/",
|
||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||
|
||||
long totalCapacity = sum(capacities);
|
||||
|
||||
// fill up the cluster with 30% data. It'll be 45% full plus parity.
|
||||
long fileLen = totalCapacity * 3 / 10;
|
||||
long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
|
||||
FileSystem fs = cluster.getFileSystem(0);
|
||||
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
|
||||
|
||||
// verify locations of striped blocks
|
||||
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
|
||||
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
|
||||
|
||||
// get datanode report
|
||||
DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL);
|
||||
long totalBlocks = 0;
|
||||
for (DatanodeInfo dn : datanodeReport) {
|
||||
totalBlocks += dn.getNumBlocks();
|
||||
}
|
||||
|
||||
// add datanode in new rack
|
||||
String newRack = "/rack" + (++numOfRacks);
|
||||
cluster.startDataNodes(conf, 2, true, null,
|
||||
new String[]{newRack, newRack}, null,
|
||||
new long[]{capacity, capacity});
|
||||
totalCapacity += capacity*2;
|
||||
cluster.triggerHeartbeats();
|
||||
|
||||
// add datanode to exclude list
|
||||
Set<String> excludedList = new HashSet<>();
|
||||
excludedList.add(datanodeReport[0].getXferAddr());
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
pBuilder.setExcludedNodes(excludedList);
|
||||
|
||||
// start balancer and check the failed num of moving task
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
|
||||
excludedList.size(), false);
|
||||
|
||||
// check total blocks, max wait time 60s
|
||||
final long blocksBeforeBalancer = totalBlocks;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
DatanodeInfo[] datanodeInfos = null;
|
||||
try {
|
||||
cluster.triggerHeartbeats();
|
||||
datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL);
|
||||
} catch (IOException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
long blocksAfterBalancer = 0;
|
||||
for (DatanodeInfo dn : datanodeInfos) {
|
||||
blocksAfterBalancer += dn.getNumBlocks();
|
||||
}
|
||||
return blocksBeforeBalancer == blocksAfterBalancer;
|
||||
}, 3000, 60000);
|
||||
|
||||
// verify locations of striped blocks
|
||||
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
|
||||
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
|
||||
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void testNullStripedBlocks(Configuration conf) throws IOException {
|
||||
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
|
||||
DFSUtil.getInternalNsRpcUris(conf),
|
||||
|
|
Loading…
Reference in New Issue