HDFS-14979 Allow Balancer to submit getBlocks calls to Observer Nodes when possible. Contributed by Erik Krogen.

(cherry picked from 586defe711)
(cherry picked from dec765b329)
This commit is contained in:
Erik Krogen 2019-11-11 14:32:51 -08:00 committed by Erik Krogen
parent b4f39c2a8a
commit 5cf36aa2b1
2 changed files with 23 additions and 0 deletions

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.io.retry.AtMostOnce; import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
@ -77,6 +78,7 @@ public interface NamenodeProtocol {
datanode does not exist datanode does not exist
*/ */
@Idempotent @Idempotent
@ReadOnly
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize) throws IOException; minBlockSize) throws IOException;

View File

@ -18,8 +18,15 @@
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -33,6 +40,8 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.junit.Test; import org.junit.Test;
@ -128,12 +137,24 @@ public class TestBalancerWithHANameNodes {
cluster = qjmhaCluster.getDfsCluster(); cluster = qjmhaCluster.getDfsCluster();
cluster.waitClusterUp(); cluster.waitClusterUp();
cluster.waitActive(); cluster.waitActive();
List<FSNamesystem> namesystemSpies = new ArrayList<>();
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
namesystemSpies.add(
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i)));
}
DistributedFileSystem dfs = HATestUtil.configureObserverReadFs( DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
cluster, conf, ObserverReadProxyProvider.class, true); cluster, conf, ObserverReadProxyProvider.class, true);
client = dfs.getClient().getNamenode(); client = dfs.getClient().getNamenode();
doTest(conf); doTest(conf);
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
// First observer node is at idx 2 so it should get both getBlocks calls
// all other NameNodes should see 0 getBlocks calls
int expectedCount = (i == 2) ? 2 : 0;
verify(namesystemSpies.get(i), times(expectedCount))
.getBlocks(any(), anyLong(), anyLong());
}
} finally { } finally {
if (qjmhaCluster != null) { if (qjmhaCluster != null) {
qjmhaCluster.shutdown(); qjmhaCluster.shutdown();