HDFS-16333. fix balancer bug when transfer an EC block (#3777)

This commit is contained in:
qinyuren 2021-12-09 23:33:03 +08:00 committed by Chao Sun
parent 52c5bbc993
commit 04f0025ad2
2 changed files with 166 additions and 5 deletions

View File

@ -491,7 +491,7 @@ public class Dispatcher {
public static class DBlockStriped extends DBlock { public static class DBlockStriped extends DBlock {
final byte[] indices; private byte[] indices;
final short dataBlockNum; final short dataBlockNum;
final int cellSize; final int cellSize;
@ -528,6 +528,29 @@ public class Dispatcher {
} }
return block.getNumBytes(); return block.getNumBytes();
} }
public void setIndices(byte[] indices) {
this.indices = indices;
}
/**
* Adjust EC block indicesit will remove the element of adjustList from indices.
* @param adjustList the list will be removed from indices
*/
public void adjustIndices(List<Integer> adjustList) {
if (adjustList.isEmpty()) {
return;
}
byte[] newIndices = new byte[indices.length - adjustList.size()];
for (int i = 0, j = 0; i < indices.length; ++i) {
if (!adjustList.contains(i)) {
newIndices[j] = indices[i];
++j;
}
}
this.indices = newIndices;
}
} }
/** The class represents a desired move. */ /** The class represents a desired move. */
@ -804,7 +827,7 @@ public class Dispatcher {
* *
* @return the total size of the received blocks in the number of bytes. * @return the total size of the received blocks in the number of bytes.
*/ */
private long getBlockList() throws IOException { private long getBlockList() throws IOException, IllegalArgumentException {
final long size = Math.min(getBlocksSize, blocksToReceive); final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs = final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize); nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
@ -841,7 +864,14 @@ public class Dispatcher {
synchronized (block) { synchronized (block) {
block.clearLocations(); block.clearLocations();
if (blkLocs instanceof StripedBlockWithLocations) {
// EC block may adjust indices before, avoid repeated adjustments
((DBlockStriped) block).setIndices(
((StripedBlockWithLocations) blkLocs).getIndices());
}
// update locations // update locations
List<Integer> adjustList = new ArrayList<>();
final String[] datanodeUuids = blkLocs.getDatanodeUuids(); final String[] datanodeUuids = blkLocs.getDatanodeUuids();
final StorageType[] storageTypes = blkLocs.getStorageTypes(); final StorageType[] storageTypes = blkLocs.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) { for (int i = 0; i < datanodeUuids.length; i++) {
@ -849,8 +879,20 @@ public class Dispatcher {
datanodeUuids[i], storageTypes[i]); datanodeUuids[i], storageTypes[i]);
if (g != null) { // not unknown if (g != null) { // not unknown
block.addLocation(g); block.addLocation(g);
} else if (blkLocs instanceof StripedBlockWithLocations) {
// some datanode may not in storageGroupMap due to decommission operation
// or balancer cli with "-exclude" parameter
adjustList.add(i);
} }
} }
if (!adjustList.isEmpty()) {
// block.locations mismatch with block.indices
// adjust indices to get correct internalBlock for Datanode in #getInternalBlock
((DBlockStriped) block).adjustIndices(adjustList);
Preconditions.checkArgument(((DBlockStriped) block).indices.length
== block.locations.size());
}
} }
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -970,7 +1012,7 @@ public class Dispatcher {
} }
blocksToReceive -= received; blocksToReceive -= received;
continue; continue;
} catch (IOException e) { } catch (IOException | IllegalArgumentException e) {
LOG.warn("Exception while getting reportedBlock list", e); LOG.warn("Exception while getting reportedBlock list", e);
return; return;
} }

View File

@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -468,6 +469,19 @@ public class TestBalancer {
static void waitForBalancer(long totalUsedSpace, long totalCapacity, static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes) throws IOException, TimeoutException { int expectedExcludedNodes) throws IOException, TimeoutException {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
}
/**
* Wait until balanced: each datanode gives utilization within.
* BALANCE_ALLOWED_VARIANCE of average
* @throws IOException
* @throws TimeoutException
*/
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes, boolean checkExcludeNodesUtilization)
throws IOException, TimeoutException {
long timeout = TIMEOUT; long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.monotonicNow() + timeout; : Time.monotonicNow() + timeout;
@ -489,7 +503,9 @@ public class TestBalancer {
double nodeUtilization = ((double)datanode.getDfsUsed()) double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity(); / datanode.getCapacity();
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) { if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
assertTrue(nodeUtilization == 0); if (checkExcludeNodesUtilization) {
assertTrue(nodeUtilization == 0);
}
actualExcludedNodeCount++; actualExcludedNodeCount++;
continue; continue;
} }
@ -774,6 +790,12 @@ public class TestBalancer {
private void runBalancer(Configuration conf, long totalUsedSpace, private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, int excludedNodes) long totalCapacity, BalancerParameters p, int excludedNodes)
throws Exception { throws Exception {
runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
}
private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, int excludedNodes,
boolean checkExcludeNodesUtilization) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
int retry = 5; int retry = 5;
@ -794,7 +816,7 @@ public class TestBalancer {
LOG.info(" ."); LOG.info(" .");
try { try {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
excludedNodes); excludedNodes, checkExcludeNodesUtilization);
} catch (TimeoutException e) { } catch (TimeoutException e) {
// See HDFS-11682. NN may not get heartbeat to reflect the newest // See HDFS-11682. NN may not get heartbeat to reflect the newest
// block changes. // block changes.
@ -1628,6 +1650,103 @@ public class TestBalancer {
} }
} }
@Test
public void testBalancerWithExcludeListWithStripedFile() throws Exception {
Configuration conf = new Configuration();
initConfWithStripe(conf);
NameNodeConnector.setWrite2IdFile(true);
doTestBalancerWithExcludeListWithStripedFile(conf);
NameNodeConnector.setWrite2IdFile(false);
}
private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception {
int numOfDatanodes = dataBlocks + parityBlocks + 5;
int numOfRacks = dataBlocks;
long capacity = 20 * defaultBlockSize;
long[] capacities = new long[numOfDatanodes];
Arrays.fill(capacities, capacity);
String[] racks = new String[numOfDatanodes];
for (int i = 0; i < numOfDatanodes; i++) {
racks[i] = "/rack" + (i % numOfRacks);
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.racks(racks)
.simulatedCapacities(capacities)
.build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
client.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
client.setErasureCodingPolicy("/",
StripedFileTestUtil.getDefaultECPolicy().getName());
long totalCapacity = sum(capacities);
// fill up the cluster with 30% data. It'll be 45% full plus parity.
long fileLen = totalCapacity * 3 / 10;
long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
FileSystem fs = cluster.getFileSystem(0);
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
// verify locations of striped blocks
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
// get datanode report
DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL);
long totalBlocks = 0;
for (DatanodeInfo dn : datanodeReport) {
totalBlocks += dn.getNumBlocks();
}
// add datanode in new rack
String newRack = "/rack" + (++numOfRacks);
cluster.startDataNodes(conf, 2, true, null,
new String[]{newRack, newRack}, null,
new long[]{capacity, capacity});
totalCapacity += capacity*2;
cluster.triggerHeartbeats();
// add datanode to exclude list
Set<String> excludedList = new HashSet<>();
excludedList.add(datanodeReport[0].getXferAddr());
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
pBuilder.setExcludedNodes(excludedList);
// start balancer and check the failed num of moving task
runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
excludedList.size(), false);
// check total blocks, max wait time 60s
final long blocksBeforeBalancer = totalBlocks;
GenericTestUtils.waitFor(() -> {
DatanodeInfo[] datanodeInfos = null;
try {
cluster.triggerHeartbeats();
datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
long blocksAfterBalancer = 0;
for (DatanodeInfo dn : datanodeInfos) {
blocksAfterBalancer += dn.getNumBlocks();
}
return blocksBeforeBalancer == blocksAfterBalancer;
}, 3000, 60000);
// verify locations of striped blocks
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
} finally {
cluster.shutdown();
}
}
private void testNullStripedBlocks(Configuration conf) throws IOException { private void testNullStripedBlocks(Configuration conf) throws IOException {
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors( NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
DFSUtil.getInternalNsRpcUris(conf), DFSUtil.getInternalNsRpcUris(conf),