Revert "HDFS-16333. fix balancer bug when transfer an EC block (#3679)"
This reverts commit 2072a6a476
.
This commit is contained in:
parent
2072a6a476
commit
2315849b8f
|
@ -490,7 +490,7 @@ public class Dispatcher {
|
||||||
|
|
||||||
public static class DBlockStriped extends DBlock {
|
public static class DBlockStriped extends DBlock {
|
||||||
|
|
||||||
private byte[] indices;
|
final byte[] indices;
|
||||||
final short dataBlockNum;
|
final short dataBlockNum;
|
||||||
final int cellSize;
|
final int cellSize;
|
||||||
|
|
||||||
|
@ -527,29 +527,6 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
return block.getNumBytes();
|
return block.getNumBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setIndices(byte[] indices) {
|
|
||||||
this.indices = indices;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adjust EC block indices,it will remove the element of adjustList from indices.
|
|
||||||
* @param adjustList the list will be removed from indices
|
|
||||||
*/
|
|
||||||
public void adjustIndices(List<Integer> adjustList) {
|
|
||||||
if (adjustList.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] newIndices = new byte[indices.length - adjustList.size()];
|
|
||||||
for (int i = 0, j = 0; i < indices.length; ++i) {
|
|
||||||
if (!adjustList.contains(i)) {
|
|
||||||
newIndices[j] = indices[i];
|
|
||||||
++j;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.indices = newIndices;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The class represents a desired move. */
|
/** The class represents a desired move. */
|
||||||
|
@ -826,7 +803,7 @@ public class Dispatcher {
|
||||||
*
|
*
|
||||||
* @return the total size of the received blocks in the number of bytes.
|
* @return the total size of the received blocks in the number of bytes.
|
||||||
*/
|
*/
|
||||||
private long getBlockList() throws IOException, IllegalArgumentException {
|
private long getBlockList() throws IOException {
|
||||||
final long size = Math.min(getBlocksSize, blocksToReceive);
|
final long size = Math.min(getBlocksSize, blocksToReceive);
|
||||||
final BlocksWithLocations newBlksLocs =
|
final BlocksWithLocations newBlksLocs =
|
||||||
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
|
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
|
||||||
|
@ -863,14 +840,7 @@ public class Dispatcher {
|
||||||
synchronized (block) {
|
synchronized (block) {
|
||||||
block.clearLocations();
|
block.clearLocations();
|
||||||
|
|
||||||
if (blkLocs instanceof StripedBlockWithLocations) {
|
|
||||||
// EC block may adjust indices before, avoid repeated adjustments
|
|
||||||
((DBlockStriped) block).setIndices(
|
|
||||||
((StripedBlockWithLocations) blkLocs).getIndices());
|
|
||||||
}
|
|
||||||
|
|
||||||
// update locations
|
// update locations
|
||||||
List<Integer> adjustList = new ArrayList<>();
|
|
||||||
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
|
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
|
||||||
final StorageType[] storageTypes = blkLocs.getStorageTypes();
|
final StorageType[] storageTypes = blkLocs.getStorageTypes();
|
||||||
for (int i = 0; i < datanodeUuids.length; i++) {
|
for (int i = 0; i < datanodeUuids.length; i++) {
|
||||||
|
@ -878,20 +848,8 @@ public class Dispatcher {
|
||||||
datanodeUuids[i], storageTypes[i]);
|
datanodeUuids[i], storageTypes[i]);
|
||||||
if (g != null) { // not unknown
|
if (g != null) { // not unknown
|
||||||
block.addLocation(g);
|
block.addLocation(g);
|
||||||
} else if (blkLocs instanceof StripedBlockWithLocations) {
|
|
||||||
// some datanode may not in storageGroupMap due to decommission operation
|
|
||||||
// or balancer cli with "-exclude" parameter
|
|
||||||
adjustList.add(i);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!adjustList.isEmpty()) {
|
|
||||||
// block.locations mismatch with block.indices
|
|
||||||
// adjust indices to get correct internalBlock for Datanode in #getInternalBlock
|
|
||||||
((DBlockStriped) block).adjustIndices(adjustList);
|
|
||||||
Preconditions.checkArgument(((DBlockStriped) block).indices.length
|
|
||||||
== block.locations.size());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
|
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
@ -1011,7 +969,7 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
blocksToReceive -= received;
|
blocksToReceive -= received;
|
||||||
continue;
|
continue;
|
||||||
} catch (IOException | IllegalArgumentException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Exception while getting reportedBlock list", e);
|
LOG.warn("Exception while getting reportedBlock list", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -468,19 +468,6 @@ public class TestBalancer {
|
||||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||||
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
||||||
int expectedExcludedNodes) throws IOException, TimeoutException {
|
int expectedExcludedNodes) throws IOException, TimeoutException {
|
||||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait until balanced: each datanode gives utilization within.
|
|
||||||
* BALANCE_ALLOWED_VARIANCE of average
|
|
||||||
* @throws IOException
|
|
||||||
* @throws TimeoutException
|
|
||||||
*/
|
|
||||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
|
||||||
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
|
||||||
int expectedExcludedNodes, boolean checkExcludeNodesUtilization)
|
|
||||||
throws IOException, TimeoutException {
|
|
||||||
long timeout = TIMEOUT;
|
long timeout = TIMEOUT;
|
||||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||||
: Time.monotonicNow() + timeout;
|
: Time.monotonicNow() + timeout;
|
||||||
|
@ -502,9 +489,7 @@ public class TestBalancer {
|
||||||
double nodeUtilization = ((double)datanode.getDfsUsed())
|
double nodeUtilization = ((double)datanode.getDfsUsed())
|
||||||
/ datanode.getCapacity();
|
/ datanode.getCapacity();
|
||||||
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
|
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
|
||||||
if (checkExcludeNodesUtilization) {
|
assertTrue(nodeUtilization == 0);
|
||||||
assertTrue(nodeUtilization == 0);
|
|
||||||
}
|
|
||||||
actualExcludedNodeCount++;
|
actualExcludedNodeCount++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -789,12 +774,6 @@ public class TestBalancer {
|
||||||
private void runBalancer(Configuration conf, long totalUsedSpace,
|
private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||||
long totalCapacity, BalancerParameters p, int excludedNodes)
|
long totalCapacity, BalancerParameters p, int excludedNodes)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runBalancer(Configuration conf, long totalUsedSpace,
|
|
||||||
long totalCapacity, BalancerParameters p, int excludedNodes,
|
|
||||||
boolean checkExcludeNodesUtilization) throws Exception {
|
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||||
|
|
||||||
int retry = 5;
|
int retry = 5;
|
||||||
|
@ -815,7 +794,7 @@ public class TestBalancer {
|
||||||
LOG.info(" .");
|
LOG.info(" .");
|
||||||
try {
|
try {
|
||||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
||||||
excludedNodes, checkExcludeNodesUtilization);
|
excludedNodes);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
// See HDFS-11682. NN may not get heartbeat to reflect the newest
|
// See HDFS-11682. NN may not get heartbeat to reflect the newest
|
||||||
// block changes.
|
// block changes.
|
||||||
|
@ -1649,103 +1628,6 @@ public class TestBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBalancerWithExcludeListWithStripedFile() throws Exception {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
initConfWithStripe(conf);
|
|
||||||
NameNodeConnector.setWrite2IdFile(true);
|
|
||||||
doTestBalancerWithExcludeListWithStripedFile(conf);
|
|
||||||
NameNodeConnector.setWrite2IdFile(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception {
|
|
||||||
int numOfDatanodes = dataBlocks + parityBlocks + 5;
|
|
||||||
int numOfRacks = dataBlocks;
|
|
||||||
long capacity = 20 * defaultBlockSize;
|
|
||||||
long[] capacities = new long[numOfDatanodes];
|
|
||||||
Arrays.fill(capacities, capacity);
|
|
||||||
String[] racks = new String[numOfDatanodes];
|
|
||||||
for (int i = 0; i < numOfDatanodes; i++) {
|
|
||||||
racks[i] = "/rack" + (i % numOfRacks);
|
|
||||||
}
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
|
||||||
.numDataNodes(numOfDatanodes)
|
|
||||||
.racks(racks)
|
|
||||||
.simulatedCapacities(capacities)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
try {
|
|
||||||
cluster.waitActive();
|
|
||||||
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
|
|
||||||
ClientProtocol.class).getProxy();
|
|
||||||
client.enableErasureCodingPolicy(
|
|
||||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
|
||||||
client.setErasureCodingPolicy("/",
|
|
||||||
StripedFileTestUtil.getDefaultECPolicy().getName());
|
|
||||||
|
|
||||||
long totalCapacity = sum(capacities);
|
|
||||||
|
|
||||||
// fill up the cluster with 30% data. It'll be 45% full plus parity.
|
|
||||||
long fileLen = totalCapacity * 3 / 10;
|
|
||||||
long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
|
|
||||||
FileSystem fs = cluster.getFileSystem(0);
|
|
||||||
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
|
|
||||||
|
|
||||||
// verify locations of striped blocks
|
|
||||||
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
|
|
||||||
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
|
|
||||||
|
|
||||||
// get datanode report
|
|
||||||
DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL);
|
|
||||||
long totalBlocks = 0;
|
|
||||||
for (DatanodeInfo dn : datanodeReport) {
|
|
||||||
totalBlocks += dn.getNumBlocks();
|
|
||||||
}
|
|
||||||
|
|
||||||
// add datanode in new rack
|
|
||||||
String newRack = "/rack" + (++numOfRacks);
|
|
||||||
cluster.startDataNodes(conf, 2, true, null,
|
|
||||||
new String[]{newRack, newRack}, null,
|
|
||||||
new long[]{capacity, capacity});
|
|
||||||
totalCapacity += capacity*2;
|
|
||||||
cluster.triggerHeartbeats();
|
|
||||||
|
|
||||||
// add datanode to exclude list
|
|
||||||
Set<String> excludedList = new HashSet<>();
|
|
||||||
excludedList.add(datanodeReport[0].getXferAddr());
|
|
||||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
|
||||||
pBuilder.setExcludedNodes(excludedList);
|
|
||||||
|
|
||||||
// start balancer and check the failed num of moving task
|
|
||||||
runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
|
|
||||||
excludedList.size(), false);
|
|
||||||
|
|
||||||
// check total blocks, max wait time 60s
|
|
||||||
final long blocksBeforeBalancer = totalBlocks;
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
|
||||||
DatanodeInfo[] datanodeInfos = null;
|
|
||||||
try {
|
|
||||||
cluster.triggerHeartbeats();
|
|
||||||
datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL);
|
|
||||||
} catch (IOException e) {
|
|
||||||
Assert.fail(e.getMessage());
|
|
||||||
}
|
|
||||||
long blocksAfterBalancer = 0;
|
|
||||||
for (DatanodeInfo dn : datanodeInfos) {
|
|
||||||
blocksAfterBalancer += dn.getNumBlocks();
|
|
||||||
}
|
|
||||||
return blocksBeforeBalancer == blocksAfterBalancer;
|
|
||||||
}, 3000, 60000);
|
|
||||||
|
|
||||||
// verify locations of striped blocks
|
|
||||||
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
|
|
||||||
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testNullStripedBlocks(Configuration conf) throws IOException {
|
private void testNullStripedBlocks(Configuration conf) throws IOException {
|
||||||
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
|
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
|
||||||
DFSUtil.getInternalNsRpcUris(conf),
|
DFSUtil.getInternalNsRpcUris(conf),
|
||||||
|
|
Loading…
Reference in New Issue