HBASE-25184 Move RegionLocationFinder to hbase-balancer (#2543)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-04-10 21:10:53 +08:00 committed by GitHub
parent 75494108f8
commit f9e928e5a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 423 additions and 304 deletions

View File

@ -74,6 +74,12 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-logging</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId> <artifactId>hbase-common</artifactId>
@ -92,6 +98,11 @@
<scope>compile</scope> <scope>compile</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This is the cluster we want to balance. It provides methods to let us get the information we
* want.
*/
@InterfaceAudience.Private
public interface ClusterInfoProvider {
/**
* Get all the regions of this cluster.
* <p/>
* Used to refresh region block locations on HDFS.
*/
List<RegionInfo> getAssignedRegions();
/**
* Get the table descriptor for the given table.
*/
TableDescriptor getTableDescriptor(TableName tableName) throws IOException;
/**
* Compute the block distribution for the given region.
* <p/>
* Used to refresh region block locations on HDFS.
*/
HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException;
}

View File

@ -17,34 +17,33 @@
*/ */
package org.apache.hadoop.hbase.master.balancer; package org.apache.hadoop.hbase.master.balancer;
import java.io.FileNotFoundException; import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
@ -52,55 +51,48 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.MoreExecuto
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* This will find where data for a region is located in HDFS. It ranks * This will find where data for a region is located in HDFS. It ranks {@link ServerName}'s by the
* {@link ServerName}'s by the size of the store files they are holding for a * size of the store files they are holding for a given region.
* given region.
*
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class RegionLocationFinder { class RegionHDFSBlockLocationFinder extends Configured {
private static final Logger LOG = LoggerFactory.getLogger(RegionLocationFinder.class); private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000; private static final long CACHE_TIME = 240 * 60 * 1000;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution(); private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
private Configuration conf; new HDFSBlocksDistribution();
private volatile ClusterMetrics status; private volatile ClusterMetrics status;
private MasterServices services; private volatile ClusterInfoProvider provider;
private final ListeningExecutorService executor; private final ListeningExecutorService executor;
// Do not scheduleFullRefresh at master startup // Do not scheduleFullRefresh at master startup
private long lastFullRefresh = EnvironmentEdgeManager.currentTime(); private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader = private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader =
new CacheLoader<RegionInfo, HDFSBlocksDistribution>() { new CacheLoader<RegionInfo, HDFSBlocksDistribution>() {
@Override @Override
public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri, public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri,
HDFSBlocksDistribution oldValue) throws Exception { HDFSBlocksDistribution oldValue) throws Exception {
return executor.submit(new Callable<HDFSBlocksDistribution>() { return executor.submit(new Callable<HDFSBlocksDistribution>() {
@Override @Override
public HDFSBlocksDistribution call() throws Exception { public HDFSBlocksDistribution call() throws Exception {
return internalGetTopBlockLocation(hri); return internalGetTopBlockLocation(hri);
} }
}); });
} }
@Override @Override
public HDFSBlocksDistribution load(RegionInfo key) throws Exception { public HDFSBlocksDistribution load(RegionInfo key) throws Exception {
return internalGetTopBlockLocation(key); return internalGetTopBlockLocation(key);
} }
}; };
// The cache for where regions are located. // The cache for where regions are located.
private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null; private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null;
RegionLocationFinder() { RegionHDFSBlockLocationFinder() {
this.cache = createCache(); this.cache = createCache();
executor = MoreExecutors.listeningDecorator( executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(5,
Executors.newScheduledThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("region-location-%d").build()));
5,
new ThreadFactoryBuilder().
setDaemon(true)
.setNameFormat("region-location-%d")
.build()));
} }
/** /**
@ -108,99 +100,67 @@ class RegionLocationFinder {
* @return A new Cache. * @return A new Cache.
*/ */
private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() { private LoadingCache<RegionInfo, HDFSBlocksDistribution> createCache() {
return CacheBuilder.newBuilder() return CacheBuilder.newBuilder().expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
.expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS) .build(loader);
.build(loader);
} }
public Configuration getConf() { void setClusterInfoProvider(ClusterInfoProvider provider) {
return conf; this.provider = provider;
} }
public void setConf(Configuration conf) { void setClusterMetrics(ClusterMetrics status) {
this.conf = conf;
}
public void setServices(MasterServices services) {
this.services = services;
}
public void setClusterMetrics(ClusterMetrics status) {
long currentTime = EnvironmentEdgeManager.currentTime(); long currentTime = EnvironmentEdgeManager.currentTime();
this.status = status; this.status = status;
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) { if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
// Only count the refresh if it includes user tables ( eg more than meta and namespace ). // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh; lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
} }
} }
/** /**
* Refresh all the region locations. * Refresh all the region locations.
*
* @return true if user created regions got refreshed. * @return true if user created regions got refreshed.
*/ */
private boolean scheduleFullRefresh() { private boolean scheduleFullRefresh() {
ClusterInfoProvider service = this.provider;
// Protect from anything being null while starting up. // Protect from anything being null while starting up.
if (services == null) { if (service == null) {
return false;
}
final AssignmentManager am = services.getAssignmentManager();
if (am == null) {
return false; return false;
} }
// TODO: Should this refresh all the regions or only the ones assigned? // TODO: Should this refresh all the regions or only the ones assigned?
boolean includesUserTables = false; boolean includesUserTables = false;
for (final RegionInfo hri : am.getAssignedRegions()) { for (final RegionInfo hri : service.getAssignedRegions()) {
cache.refresh(hri); cache.refresh(hri);
includesUserTables = includesUserTables || !hri.getTable().isSystemTable(); includesUserTables |= !hri.getTable().isSystemTable();
} }
return includesUserTables; return includesUserTables;
} }
protected List<ServerName> getTopBlockLocations(RegionInfo region) { List<ServerName> getTopBlockLocations(RegionInfo region) {
List<String> topHosts = getBlockDistribution(region).getTopHosts(); List<String> topHosts = getBlockDistribution(region).getTopHosts();
return mapHostNameToServerName(topHosts); return mapHostNameToServerName(topHosts);
} }
/** /**
* Returns an ordered list of hosts which have better locality for this region * Returns an ordered list of hosts that are hosting the blocks for this region. The weight of
* than the current host. * each host is the sum of the block lengths of all files on that host, so the first host in the
*/ * list is the server which holds the most bytes of the given region's HFiles.
protected List<ServerName> getTopBlockLocations(RegionInfo region, String currentHost) {
HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
List<String> topHosts = new ArrayList<>();
for (String host : blocksDistribution.getTopHosts()) {
if (host.equals(currentHost)) {
break;
}
topHosts.add(host);
}
return mapHostNameToServerName(topHosts);
}
/**
* Returns an ordered list of hosts that are hosting the blocks for this
* region. The weight of each host is the sum of the block lengths of all
* files on that host, so the first host in the list is the server which holds
* the most bytes of the given region's HFiles.
*
* @param region region * @param region region
* @return ordered list of hosts holding blocks of the specified region * @return ordered list of hosts holding blocks of the specified region
*/ */
protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) { private HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) {
try { try {
TableDescriptor tableDescriptor = getDescriptor(region.getTable()); TableDescriptor tableDescriptor = getDescriptor(region.getTable());
if (tableDescriptor != null) { if (tableDescriptor != null) {
HDFSBlocksDistribution blocksDistribution = HDFSBlocksDistribution blocksDistribution =
HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); provider.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
return blocksDistribution; return blocksDistribution;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " +
+ region.getEncodedName(), ioe); region.getEncodedName(), ioe);
} }
return EMPTY_BLOCK_DISTRIBUTION; return EMPTY_BLOCK_DISTRIBUTION;
@ -208,44 +168,36 @@ class RegionLocationFinder {
/** /**
* return TableDescriptor for a given tableName * return TableDescriptor for a given tableName
*
* @param tableName the table name * @param tableName the table name
* @return TableDescriptor
* @throws IOException
*/ */
protected TableDescriptor getDescriptor(TableName tableName) throws IOException { private TableDescriptor getDescriptor(TableName tableName) throws IOException {
TableDescriptor tableDescriptor = null; ClusterInfoProvider service = this.provider;
try { if (service == null) {
if (this.services != null && this.services.getTableDescriptors() != null) { return null;
tableDescriptor = this.services.getTableDescriptors().get(tableName);
}
} catch (FileNotFoundException fnfe) {
LOG.debug("tableName={}", tableName, fnfe);
} }
return service.getTableDescriptor(tableName);
return tableDescriptor;
} }
/** /**
* Map hostname to ServerName, The output ServerName list will have the same * Map hostname to ServerName, The output ServerName list will have the same order as input hosts.
* order as input hosts.
*
* @param hosts the list of hosts * @param hosts the list of hosts
* @return ServerName list * @return ServerName list
*/ */
protected List<ServerName> mapHostNameToServerName(List<String> hosts) { @RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*|.*/RegionHDFSBlockLocationFinder.java")
List<ServerName> mapHostNameToServerName(List<String> hosts) {
if (hosts == null || status == null) { if (hosts == null || status == null) {
if (hosts == null) { if (hosts == null) {
LOG.warn("RegionLocationFinder top hosts is null"); LOG.warn("RegionLocationFinder top hosts is null");
} }
return Lists.newArrayList(); return Collections.emptyList();
} }
List<ServerName> topServerNames = new ArrayList<>(); List<ServerName> topServerNames = new ArrayList<>();
Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet(); Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet();
// create a mapping from hostname to ServerName for fast lookup // create a mapping from hostname to ServerName for fast lookup
HashMap<String, List<ServerName>> hostToServerName = new HashMap<>(); Map<String, List<ServerName>> hostToServerName = new HashMap<>();
for (ServerName sn : regionServers) { for (ServerName sn : regionServers) {
String host = sn.getHostname(); String host = sn.getHostname();
if (!hostToServerName.containsKey(host)) { if (!hostToServerName.containsKey(host)) {
@ -269,7 +221,7 @@ class RegionLocationFinder {
return topServerNames; return topServerNames;
} }
public HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) { HDFSBlocksDistribution getBlockDistribution(RegionInfo hri) {
HDFSBlocksDistribution blockDistbn = null; HDFSBlocksDistribution blockDistbn = null;
try { try {
if (cache.asMap().containsKey(hri)) { if (cache.asMap().containsKey(hri)) {
@ -289,8 +241,7 @@ class RegionLocationFinder {
} }
} }
private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution( private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(RegionInfo hri) {
RegionInfo hri) {
try { try {
return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION); return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
} catch (Exception e) { } catch (Exception e) {
@ -298,29 +249,29 @@ class RegionLocationFinder {
} }
} }
public void refreshAndWait(Collection<RegionInfo> hris) { void refreshAndWait(Collection<RegionInfo> hris) {
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = new ArrayList<>(hris.size()); ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
new ArrayList<>(hris.size());
for (RegionInfo hregionInfo : hris) { for (RegionInfo hregionInfo : hris) {
regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo)); regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
} }
int index = 0; int index = 0;
for (RegionInfo hregionInfo : hris) { for (RegionInfo hregionInfo : hris) {
ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures.get(index);
.get(index);
try { try {
cache.put(hregionInfo, future.get()); cache.put(hregionInfo, future.get());
} catch (InterruptedException ite) { } catch (InterruptedException ite) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
LOG.debug( LOG.debug("ExecutionException during HDFSBlocksDistribution computation. for region = " +
"ExecutionException during HDFSBlocksDistribution computation. for region = " hregionInfo.getEncodedName(), ee);
+ hregionInfo.getEncodedName(), ee);
} }
index++; index++;
} }
} }
// For test @RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() { LoadingCache<RegionInfo, HDFSBlocksDistribution> getCache() {
return cache; return cache;
} }

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, SmallTests.class })
public class TestRegionHDFSBlockLocationFinder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionHDFSBlockLocationFinder.class);
private static TableDescriptor TD;
private static List<RegionInfo> REGIONS;
private RegionHDFSBlockLocationFinder finder;
private static HDFSBlocksDistribution generate(RegionInfo region) {
HDFSBlocksDistribution distribution = new HDFSBlocksDistribution();
int seed = region.hashCode();
Random rand = new Random(seed);
int size = 1 + rand.nextInt(10);
for (int i = 0; i < size; i++) {
distribution.addHostsAndBlockWeight(new String[] { "host-" + i }, 1 + rand.nextInt(100));
}
return distribution;
}
@BeforeClass
public static void setUpBeforeClass() {
TD = TableDescriptorBuilder.newBuilder(TableName.valueOf("RegionLocationFinder")).build();
int numRegions = 100;
REGIONS = new ArrayList<>(numRegions);
for (int i = 1; i <= numRegions; i++) {
byte[] startKey = i == 0 ? HConstants.EMPTY_START_ROW : Bytes.toBytes(i);
byte[] endKey = i == numRegions ? HConstants.EMPTY_BYTE_ARRAY : Bytes.toBytes(i + 1);
RegionInfo region = RegionInfoBuilder.newBuilder(TD.getTableName()).setStartKey(startKey)
.setEndKey(endKey).build();
REGIONS.add(region);
}
}
@Before
public void setUp() {
finder = new RegionHDFSBlockLocationFinder();
finder.setClusterInfoProvider(new ClusterInfoProvider() {
@Override
public TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
return TD;
}
@Override
public List<RegionInfo> getAssignedRegions() {
return REGIONS;
}
@Override
public HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException {
return generate(regionInfo);
}
});
}
@Test
public void testMapHostNameToServerName() throws Exception {
assertTrue(finder.mapHostNameToServerName(null).isEmpty());
List<String> hosts = new ArrayList<>();
for (int i = 0; i < 10; i += 2) {
hosts.add("host-" + i);
}
assertTrue(finder.mapHostNameToServerName(hosts).isEmpty());
Map<ServerName, ServerMetrics> serverMetrics = new HashMap<>();
for (int i = 0; i < 10; i += 2) {
ServerName sn = ServerName.valueOf("host-" + i, 12345, 12345);
serverMetrics.put(sn, null);
}
ClusterMetrics metrics = mock(ClusterMetrics.class);
when(metrics.getLiveServerMetrics()).thenReturn(serverMetrics);
finder.setClusterMetrics(metrics);
List<ServerName> sns = finder.mapHostNameToServerName(hosts);
assertEquals(5, sns.size());
for (int i = 0; i < 5; i++) {
ServerName sn = sns.get(i);
assertEquals("host-" + (2 * i), sn.getHostname());
assertEquals(12345, sn.getPort());
assertEquals(12345, sn.getStartcode());
}
}
@Test
public void testRefreshAndWait() throws Exception {
finder.getCache().invalidateAll();
for (RegionInfo region : REGIONS) {
assertNull(finder.getCache().getIfPresent(region));
}
finder.refreshAndWait(REGIONS);
for (RegionInfo region : REGIONS) {
assertNotNull(finder.getCache().getIfPresent(region));
}
}
private void assertHostAndWeightEquals(HDFSBlocksDistribution expected,
HDFSBlocksDistribution actual) {
Map<String, HostAndWeight> expectedMap = expected.getHostAndWeights();
Map<String, HostAndWeight> actualMap = actual.getHostAndWeights();
assertEquals(expectedMap.size(), actualMap.size());
expectedMap.forEach((k, expectedHostAndWeight) -> {
HostAndWeight actualHostAndWeight = actualMap.get(k);
assertEquals(expectedHostAndWeight.getHost(), actualHostAndWeight.getHost());
assertEquals(expectedHostAndWeight.getWeight(), actualHostAndWeight.getWeight());
assertEquals(expectedHostAndWeight.getWeightForSsd(), actualHostAndWeight.getWeightForSsd());
});
}
@Test
public void testGetBlockDistribution() {
Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
assertHostAndWeightEquals(generate(region), hbd);
cache.put(region, hbd);
}
// the instance should be cached
for (RegionInfo region : REGIONS) {
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
assertSame(cache.get(region), hbd);
}
}
@Test
public void testGetTopBlockLocations() {
Map<ServerName, ServerMetrics> serverMetrics = new HashMap<>();
for (int i = 0; i < 10; i++) {
ServerName sn = ServerName.valueOf("host-" + i, 12345, 12345);
serverMetrics.put(sn, null);
}
ClusterMetrics metrics = mock(ClusterMetrics.class);
when(metrics.getLiveServerMetrics()).thenReturn(serverMetrics);
finder.setClusterMetrics(metrics);
for (RegionInfo region : REGIONS) {
List<ServerName> servers = finder.getTopBlockLocations(region);
long previousWeight = Long.MAX_VALUE;
HDFSBlocksDistribution hbd = generate(region);
for (ServerName server : servers) {
long weight = hbd.getWeight(server.getHostname());
assertTrue(weight <= previousWeight);
previousWeight = weight;
}
}
}
}

View File

@ -62,14 +62,14 @@ public class TestHDFSBlocksDistribution {
, distribution.getHostAndWeights().get("test").getWeightForSsd()); , distribution.getHostAndWeights().get("test").getWeightForSsd());
} }
public class MockHDFSBlocksDistribution extends HDFSBlocksDistribution { private static final class MockHDFSBlocksDistribution extends HDFSBlocksDistribution {
@Override @Override
public Map<String,HostAndWeight> getHostAndWeights() { public Map<String, HostAndWeight> getHostAndWeights() {
HashMap<String, HostAndWeight> map = new HashMap<>(); HashMap<String, HostAndWeight> map = new HashMap<>();
map.put("test", new HostAndWeight(null, 100, 0)); map.put("test", new HostAndWeight(null, 100, 0));
return map; return map;
} }
} }
@Test @Test

View File

@ -88,7 +88,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
= load -> load.getRegionMetrics().isEmpty(); = load -> load.getRegionMetrics().isEmpty();
protected RegionLocationFinder regionFinder; protected RegionHDFSBlockLocationFinder regionFinder;
protected boolean useRegionFinder; protected boolean useRegionFinder;
protected boolean isByTable = false; protected boolean isByTable = false;
@ -124,7 +124,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
private void createRegionFinder() { private void createRegionFinder() {
useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true); useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) { if (useRegionFinder) {
regionFinder = new RegionLocationFinder(); regionFinder = new RegionHDFSBlockLocationFinder();
} }
} }
@ -147,7 +147,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
ArrayList<String> tables; ArrayList<String> tables;
RegionInfo[] regions; RegionInfo[] regions;
Deque<BalancerRegionLoad>[] regionLoads; Deque<BalancerRegionLoad>[] regionLoads;
private RegionLocationFinder regionFinder; private RegionHDFSBlockLocationFinder regionFinder;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@ -200,7 +200,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected Cluster( protected Cluster(
Map<ServerName, List<RegionInfo>> clusterState, Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) { RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager); this(null, clusterState, loads, regionFinder, rackManager);
} }
@ -210,7 +210,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
Collection<RegionInfo> unassignedRegions, Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) { RackManager rackManager) {
if (unassignedRegions == null) { if (unassignedRegions == null) {
@ -476,7 +476,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
/** Helper for Cluster constructor to handle a region */ /** Helper for Cluster constructor to handle a region */
private void registerRegion(RegionInfo region, int regionIndex, private void registerRegion(RegionInfo region, int regionIndex,
int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder) { RegionHDFSBlockLocationFinder regionFinder) {
String tableName = region.getTable().getNameAsString(); String tableName = region.getTable().getNameAsString();
if (!tablesToIndex.containsKey(tableName)) { if (!tablesToIndex.containsKey(tableName)) {
tables.add(tableName); tables.add(tableName);
@ -1185,7 +1185,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
masterServerName = masterServices.getServerName(); masterServerName = masterServices.getServerName();
this.services = masterServices; this.services = masterServices;
if (useRegionFinder) { if (useRegionFinder) {
this.regionFinder.setServices(masterServices); this.regionFinder.setClusterInfoProvider(new MasterClusterInfoProvider(services));
} }
if (this.services.isInMaintenanceMode()) { if (this.services.isInMaintenanceMode()) {
this.maintenanceMode = true; this.maintenanceMode = true;

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Master based cluster info provider.
*/
@InterfaceAudience.Private
class MasterClusterInfoProvider implements ClusterInfoProvider {
private final MasterServices services;
MasterClusterInfoProvider(MasterServices services) {
this.services = services;
}
@Override
public List<RegionInfo> getAssignedRegions() {
AssignmentManager am = services.getAssignmentManager();
return am != null ? am.getAssignedRegions() : Collections.emptyList();
}
@Override
public TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
TableDescriptors tds = services.getTableDescriptors();
return tds != null ? tds.get(tableName) : null;
}
@Override
public HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException {
return HRegion.computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo);
}
}

View File

@ -398,7 +398,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// instantiating the storefile infos can be quite expensive. // instantiating the storefile infos can be quite expensive.
// Allow turning this feature off if the locality cost is not going to // Allow turning this feature off if the locality cost is not going to
// be used in any computations. // be used in any computations.
RegionLocationFinder finder = null; RegionHDFSBlockLocationFinder finder = null;
if ((this.localityCost != null && this.localityCost.getMultiplier() > 0) if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
|| (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) { || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
finder = this.regionFinder; finder = this.regionFinder;

View File

@ -524,7 +524,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
assignRegions(regions, servers, clusterState); assignRegions(regions, servers, clusterState);
// mock block locality for some regions // mock block locality for some regions
RegionLocationFinder locationFinder = mock(RegionLocationFinder.class); RegionHDFSBlockLocationFinder locationFinder = mock(RegionHDFSBlockLocationFinder.class);
// block locality: region:0 => {server:0} // block locality: region:0 => {server:0}
// region:1 => {server:0, server:1} // region:1 => {server:0, server:1}
// region:42 => {server:4, server:9, server:5} // region:42 => {server:4, server:9, server:5}

View File

@ -180,10 +180,11 @@ public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
serverAssignments.put(sn, getTableRegionsFromServer(tableName, sn)); serverAssignments.put(sn, getTableRegionsFromServer(tableName, sn));
} }
} }
RegionLocationFinder regionFinder = new RegionLocationFinder(); RegionHDFSBlockLocationFinder regionFinder = new RegionHDFSBlockLocationFinder();
regionFinder.setClusterMetrics(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); regionFinder.setClusterMetrics(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
regionFinder.setConf(conf); regionFinder.setConf(conf);
regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); regionFinder.setClusterInfoProvider(
new MasterClusterInfoProvider(TEST_UTIL.getMiniHBaseCluster().getMaster()));
Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf)); Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf));
LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL
.getMiniHBaseCluster().getMaster().getLoadBalancer().getInternalBalancer(); .getMiniHBaseCluster().getMaster().getLoadBalancer().getInternalBalancer();
@ -196,8 +197,9 @@ public class TestFavoredStochasticBalancerPickers extends BalancerTestBase {
LOG.error("Most loaded server: " + mostLoadedServer + " does not match: " LOG.error("Most loaded server: " + mostLoadedServer + " does not match: "
+ cluster.servers[servers[servers.length -1]]); + cluster.servers[servers[servers.length -1]]);
} }
assertEquals(mostLoadedServer, cluster.servers[servers[servers.length -1]]); assertEquals(mostLoadedServer, cluster.servers[servers[servers.length - 1]]);
FavoredStochasticBalancer.FavoredNodeLoadPicker loadPicker = balancer.new FavoredNodeLoadPicker(); FavoredStochasticBalancer.FavoredNodeLoadPicker loadPicker =
balancer.new FavoredNodeLoadPicker();
boolean userRegionPicked = false; boolean userRegionPicked = false;
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
if (userRegionPicked) { if (userRegionPicked) {

View File

@ -1,170 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MasterTests.class, MediumTests.class})
public class TestRegionLocationFinder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionLocationFinder.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static MiniHBaseCluster cluster;
private final static TableName tableName = TableName.valueOf("table");
private final static byte[] FAMILY = Bytes.toBytes("cf");
private static Table table;
private final static int ServerNum = 5;
private static RegionLocationFinder finder = new RegionLocationFinder();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
cluster = TEST_UTIL.startMiniCluster(ServerNum);
table = TEST_UTIL.createTable(tableName, FAMILY, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
TEST_UTIL.waitTableAvailable(tableName, 1000);
TEST_UTIL.loadTable(table, FAMILY);
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
for (HRegion region : server.getRegions(tableName)) {
region.flush(true);
}
}
finder.setConf(TEST_UTIL.getConfiguration());
finder.setServices(cluster.getMaster());
finder.setClusterMetrics(cluster.getMaster().getClusterMetrics());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
table.close();
TEST_UTIL.deleteTable(tableName);
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testInternalGetTopBlockLocation() throws Exception {
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
for (HRegion region : server.getRegions(tableName)) {
// get region's hdfs block distribution by region and RegionLocationFinder,
// they should have same result
HDFSBlocksDistribution blocksDistribution1 = region.getHDFSBlocksDistribution();
HDFSBlocksDistribution blocksDistribution2 = finder.getBlockDistribution(region
.getRegionInfo());
assertEquals(blocksDistribution1.getUniqueBlocksTotalWeight(),
blocksDistribution2.getUniqueBlocksTotalWeight());
if (blocksDistribution1.getUniqueBlocksTotalWeight() != 0) {
assertEquals(blocksDistribution1.getTopHosts().get(0), blocksDistribution2.getTopHosts()
.get(0));
}
}
}
}
@Test
public void testMapHostNameToServerName() throws Exception {
List<String> topHosts = new ArrayList<>();
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
String serverHost = server.getServerName().getHostname();
if (!topHosts.contains(serverHost)) {
topHosts.add(serverHost);
}
}
List<ServerName> servers = finder.mapHostNameToServerName(topHosts);
// mini cluster, all rs in one host
assertEquals(1, topHosts.size());
for (int i = 0; i < ServerNum; i++) {
ServerName server = cluster.getRegionServer(i).getServerName();
assertTrue(servers.contains(server));
}
}
@Test
public void testGetTopBlockLocations() throws Exception {
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
for (HRegion region : server.getRegions(tableName)) {
List<ServerName> servers = finder.getTopBlockLocations(region
.getRegionInfo());
// test table may have empty region
if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
continue;
}
List<String> topHosts = region.getHDFSBlocksDistribution().getTopHosts();
// rs and datanode may have different host in local machine test
if (!topHosts.contains(server.getServerName().getHostname())) {
continue;
}
for (int j = 0; j < ServerNum; j++) {
ServerName serverName = cluster.getRegionServer(j).getServerName();
assertTrue(servers.contains(serverName));
}
}
}
}
@Test
public void testRefreshAndWait() throws Exception {
finder.getCache().invalidateAll();
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
List<HRegion> regions = server.getRegions(tableName);
if (regions.size() <= 0) {
continue;
}
List<RegionInfo> regionInfos = new ArrayList<>(regions.size());
for (HRegion region : regions) {
regionInfos.add(region.getRegionInfo());
}
finder.refreshAndWait(regionInfos);
for (RegionInfo regionInfo : regionInfos) {
assertNotNull(finder.getCache().getIfPresent(regionInfo));
}
}
}
}