HBASE-17931 Assign system tables to servers with highest version
This commit is contained in:
parent
743f454d25
commit
3381c6c453
|
@ -35,6 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
public class VersionInfo {
|
public class VersionInfo {
|
||||||
private static final Log LOG = LogFactory.getLog(VersionInfo.class.getName());
|
private static final Log LOG = LogFactory.getLog(VersionInfo.class.getName());
|
||||||
|
|
||||||
|
// If between two dots there is not a number, we regard it as a very large number so it is
|
||||||
|
// higher than any numbers in the version.
|
||||||
|
private static int VERY_LARGE_NUMBER = 100000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the hbase version.
|
* Get the hbase version.
|
||||||
* @return the hbase version string, eg. "0.6.3-dev"
|
* @return the hbase version string, eg. "0.6.3-dev"
|
||||||
|
@ -110,6 +114,46 @@ public class VersionInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int compareVersion(String v1, String v2) {
|
||||||
|
//fast compare equals first
|
||||||
|
if (v1.equals(v2)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
String s1[] = v1.split("\\.|-");//1.2.3-hotfix -> [1, 2, 3, hotfix]
|
||||||
|
String s2[] = v2.split("\\.|-");
|
||||||
|
int index = 0;
|
||||||
|
while (index < s1.length && index < s2.length) {
|
||||||
|
int va = VERY_LARGE_NUMBER, vb = VERY_LARGE_NUMBER;
|
||||||
|
try {
|
||||||
|
va = Integer.parseInt(s1[index]);
|
||||||
|
} catch (Exception ingore) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
vb = Integer.parseInt(s2[index]);
|
||||||
|
} catch (Exception ingore) {
|
||||||
|
}
|
||||||
|
if (va != vb) {
|
||||||
|
return va - vb;
|
||||||
|
}
|
||||||
|
if (va == VERY_LARGE_NUMBER) {
|
||||||
|
// compare as String
|
||||||
|
int c = s1[index].compareTo(s2[index]);
|
||||||
|
if (c != 0) {
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
if (index < s1.length) {
|
||||||
|
// s1 is longer
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
//s2 is longer
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
writeTo(System.out);
|
writeTo(System.out);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestVersionInfo {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompareVersion() {
|
||||||
|
assertTrue(VersionInfo.compareVersion("1.0.0", "0.98.11") > 0);
|
||||||
|
assertTrue(VersionInfo.compareVersion("0.98.11", "1.0.1") < 0);
|
||||||
|
assertTrue(VersionInfo.compareVersion("2.0.0", "1.4.0") > 0);
|
||||||
|
assertTrue(VersionInfo.compareVersion("2.0.0", "2.0.0-SNAPSHOT") < 0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -104,6 +105,7 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.Triple;
|
import org.apache.hadoop.hbase.util.Triple;
|
||||||
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
|
@ -269,6 +271,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
private RetryCounter.BackoffPolicy backoffPolicy;
|
private RetryCounter.BackoffPolicy backoffPolicy;
|
||||||
private RetryCounter.RetryConfig retryConfig;
|
private RetryCounter.RetryConfig retryConfig;
|
||||||
|
|
||||||
|
private final Object checkIfShouldMoveSystemRegionLock = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new assignment manager.
|
* Constructs a new assignment manager.
|
||||||
*
|
*
|
||||||
|
@ -1143,12 +1148,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
if (regionState != null) {
|
if (regionState != null) {
|
||||||
// When there are more than one region server a new RS is selected as the
|
// When there are more than one region server a new RS is selected as the
|
||||||
// destination and the same is updated in the regionplan. (HBASE-5546)
|
// destination and the same is updated in the regionplan. (HBASE-5546)
|
||||||
try {
|
getRegionPlan(regionState.getRegion(), sn, true);
|
||||||
getRegionPlan(regionState.getRegion(), sn, true);
|
new ClosedRegionHandler(server, this, regionState.getRegion()).process();
|
||||||
new ClosedRegionHandler(server, this, regionState.getRegion()).process();
|
|
||||||
} catch (HBaseIOException e) {
|
|
||||||
LOG.warn("Failed to get region plan", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -2474,6 +2475,36 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
return versionOfOfflineNode;
|
return versionOfOfflineNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of servers that this region can not assign to.
|
||||||
|
* For system table, we must assign them to a server with highest version.
|
||||||
|
* RS will report to master before register on zk, and only when RS have registered on zk we can
|
||||||
|
* know the version. So in fact we will never assign a system region to a RS without registering on zk.
|
||||||
|
*/
|
||||||
|
public List<ServerName> getExcludedServersForSystemTable() {
|
||||||
|
List<Pair<ServerName, String>> serverList = new ArrayList<>();
|
||||||
|
for (ServerName s : serverManager.getOnlineServersList()) {
|
||||||
|
serverList.add(new Pair<>(s, server.getRegionServerVersion(s)));
|
||||||
|
}
|
||||||
|
if (serverList.isEmpty()) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
String highestVersion = Collections.max(serverList, new Comparator<Pair<ServerName, String>>() {
|
||||||
|
@Override
|
||||||
|
public int compare(Pair<ServerName, String> o1, Pair<ServerName, String> o2) {
|
||||||
|
return VersionInfo.compareVersion(o1.getSecond(), o2.getSecond());
|
||||||
|
}
|
||||||
|
}).getSecond();
|
||||||
|
List<ServerName> res = new ArrayList<>();
|
||||||
|
for (Pair<ServerName, String> pair : serverList) {
|
||||||
|
if (!pair.getSecond().equals(highestVersion)) {
|
||||||
|
res.add(pair.getFirst());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param region the region to assign
|
* @param region the region to assign
|
||||||
* @return Plan for passed <code>region</code> (If none currently, it creates one or
|
* @return Plan for passed <code>region</code> (If none currently, it creates one or
|
||||||
|
@ -2494,11 +2525,18 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* if no servers to assign, it returns null).
|
* if no servers to assign, it returns null).
|
||||||
*/
|
*/
|
||||||
private RegionPlan getRegionPlan(final HRegionInfo region,
|
private RegionPlan getRegionPlan(final HRegionInfo region,
|
||||||
final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
|
final ServerName serverToExclude, final boolean forceNewPlan) {
|
||||||
// Pickup existing plan or make a new one
|
// Pickup existing plan or make a new one
|
||||||
final String encodedName = region.getEncodedName();
|
final String encodedName = region.getEncodedName();
|
||||||
|
List<ServerName> exclude = new ArrayList<>();
|
||||||
|
if (region.isSystemTable()) {
|
||||||
|
exclude.addAll(getExcludedServersForSystemTable());
|
||||||
|
}
|
||||||
|
if (serverToExclude !=null) {
|
||||||
|
exclude.add(serverToExclude);
|
||||||
|
}
|
||||||
final List<ServerName> destServers =
|
final List<ServerName> destServers =
|
||||||
serverManager.createDestinationServersList(serverToExclude);
|
serverManager.createDestinationServersList(exclude);
|
||||||
|
|
||||||
if (destServers.isEmpty()){
|
if (destServers.isEmpty()){
|
||||||
LOG.warn("Can't move " + encodedName +
|
LOG.warn("Can't move " + encodedName +
|
||||||
|
@ -2528,7 +2566,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (newPlan) {
|
if (newPlan) {
|
||||||
ServerName destination = balancer.randomAssignment(region, destServers);
|
ServerName destination = null;
|
||||||
|
try {
|
||||||
|
destination = balancer.randomAssignment(region, destServers);
|
||||||
|
} catch (HBaseIOException e) {
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
if (destination == null) {
|
if (destination == null) {
|
||||||
LOG.warn("Can't find a destination for " + encodedName);
|
LOG.warn("Can't find a destination for " + encodedName);
|
||||||
return null;
|
return null;
|
||||||
|
@ -2569,6 +2612,47 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a new thread to check if there are region servers whose versions are higher than others.
|
||||||
|
* If so, move all system table regions to RS with the highest version to keep compatibility.
|
||||||
|
* The reason is, RS in new version may not be able to access RS in old version when there are
|
||||||
|
* some incompatible changes.
|
||||||
|
*/
|
||||||
|
public void checkIfShouldMoveSystemRegionAsync() {
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
synchronized (checkIfShouldMoveSystemRegionLock) {
|
||||||
|
// RS register on ZK after reports startup on master
|
||||||
|
List<HRegionInfo> regionsShouldMove = new ArrayList<>();
|
||||||
|
for (ServerName server : getExcludedServersForSystemTable()) {
|
||||||
|
regionsShouldMove.addAll(getCarryingSystemTables(server));
|
||||||
|
}
|
||||||
|
if (!regionsShouldMove.isEmpty()) {
|
||||||
|
List<RegionPlan> plans = new ArrayList<>();
|
||||||
|
for (HRegionInfo regionInfo : regionsShouldMove) {
|
||||||
|
RegionPlan plan = getRegionPlan(regionInfo, true);
|
||||||
|
if (regionInfo.isMetaRegion()) {
|
||||||
|
// Must move meta region first.
|
||||||
|
balance(plan);
|
||||||
|
} else {
|
||||||
|
plans.add(plan);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (RegionPlan plan : plans) {
|
||||||
|
balance(plan);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unassigns the specified region.
|
* Unassigns the specified region.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -3519,6 +3603,20 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
return isCarryingRegion(serverName, metaHri);
|
return isCarryingRegion(serverName, metaHri);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<HRegionInfo> getCarryingSystemTables(ServerName serverName) {
|
||||||
|
Set<HRegionInfo> regions = this.getRegionStates().getServerRegions(serverName);
|
||||||
|
if (regions == null) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
List<HRegionInfo> list = new ArrayList<>();
|
||||||
|
for (HRegionInfo region : regions) {
|
||||||
|
if (region.isSystemTable()) {
|
||||||
|
list.add(region);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the shutdown server carries the specific region.
|
* Check if the shutdown server carries the specific region.
|
||||||
* We have a bunch of places that store region location
|
* We have a bunch of places that store region location
|
||||||
|
@ -3808,11 +3906,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
regionStates.updateRegionState(hri, RegionState.State.CLOSED);
|
regionStates.updateRegionState(hri, RegionState.State.CLOSED);
|
||||||
// This below has to do w/ online enable/disable of a table
|
// This below has to do w/ online enable/disable of a table
|
||||||
removeClosedRegion(hri);
|
removeClosedRegion(hri);
|
||||||
try {
|
getRegionPlan(hri, sn, true);
|
||||||
getRegionPlan(hri, sn, true);
|
|
||||||
} catch (HBaseIOException e) {
|
|
||||||
LOG.warn("Failed to get region plan", e);
|
|
||||||
}
|
|
||||||
invokeAssign(hri, false);
|
invokeAssign(hri, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -892,6 +892,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
// Set master as 'initialized'.
|
// Set master as 'initialized'.
|
||||||
setInitialized(true);
|
setInitialized(true);
|
||||||
|
|
||||||
|
assignmentManager.checkIfShouldMoveSystemRegionAsync();
|
||||||
|
|
||||||
status.setStatus("Starting quota manager");
|
status.setStatus("Starting quota manager");
|
||||||
initQuotaManager();
|
initQuotaManager();
|
||||||
|
|
||||||
|
@ -1625,7 +1627,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
|
|
||||||
@VisibleForTesting // Public so can be accessed by tests.
|
@VisibleForTesting // Public so can be accessed by tests.
|
||||||
public void move(final byte[] encodedRegionName,
|
public void move(final byte[] encodedRegionName,
|
||||||
final byte[] destServerName) throws HBaseIOException {
|
byte[] destServerName) throws HBaseIOException {
|
||||||
RegionState regionState = assignmentManager.getRegionStates().
|
RegionState regionState = assignmentManager.getRegionStates().
|
||||||
getRegionState(Bytes.toString(encodedRegionName));
|
getRegionState(Bytes.toString(encodedRegionName));
|
||||||
if (regionState == null) {
|
if (regionState == null) {
|
||||||
|
@ -1639,11 +1641,20 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
|
|
||||||
HRegionInfo hri = regionState.getRegion();
|
HRegionInfo hri = regionState.getRegion();
|
||||||
ServerName dest;
|
ServerName dest;
|
||||||
|
List<ServerName> exclude = hri.isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
|
||||||
|
: new ArrayList<ServerName>(1);
|
||||||
|
if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
|
||||||
|
LOG.info(
|
||||||
|
Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
|
||||||
|
+ " because the server is in exclude list");
|
||||||
|
destServerName = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (destServerName == null || destServerName.length == 0) {
|
if (destServerName == null || destServerName.length == 0) {
|
||||||
LOG.info("Passed destination servername is null/empty so " +
|
LOG.info("Passed destination servername is null/empty so " +
|
||||||
"choosing a server at random");
|
"choosing a server at random");
|
||||||
final List<ServerName> destServers = this.serverManager.createDestinationServersList(
|
exclude.add(regionState.getServerName());
|
||||||
regionState.getServerName());
|
final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
|
||||||
dest = balancer.randomAssignment(hri, destServers);
|
dest = balancer.randomAssignment(hri, destServers);
|
||||||
if (dest == null) {
|
if (dest == null) {
|
||||||
LOG.debug("Unable to determine a plan to assign " + hri);
|
LOG.debug("Unable to determine a plan to assign " + hri);
|
||||||
|
@ -2501,7 +2512,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
if (info != null && info.hasVersionInfo()) {
|
if (info != null && info.hasVersionInfo()) {
|
||||||
return info.getVersionInfo().getVersion();
|
return info.getVersionInfo().getVersion();
|
||||||
}
|
}
|
||||||
return "Unknown";
|
return "0.0.0"; //Lowest version to prevent move system region to unknown version RS.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void checkIfShouldMoveSystemRegionAsync() {
|
||||||
|
assignmentManager.checkIfShouldMoveSystemRegionAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
@ -406,4 +407,8 @@ public interface MasterServices extends Server {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException;
|
public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException;
|
||||||
|
|
||||||
|
public String getRegionServerVersion(final ServerName sn);
|
||||||
|
|
||||||
|
public void checkIfShouldMoveSystemRegionAsync();
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.Triple;
|
import org.apache.hadoop.hbase.util.Triple;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -1247,13 +1248,13 @@ public class ServerManager {
|
||||||
/**
|
/**
|
||||||
* Creates a list of possible destinations for a region. It contains the online servers, but not
|
* Creates a list of possible destinations for a region. It contains the online servers, but not
|
||||||
* the draining or dying servers.
|
* the draining or dying servers.
|
||||||
* @param serverToExclude can be null if there is no server to exclude
|
* @param serversToExclude can be null if there is no server to exclude
|
||||||
*/
|
*/
|
||||||
public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
|
public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
|
||||||
final List<ServerName> destServers = getOnlineServersList();
|
final List<ServerName> destServers = getOnlineServersList();
|
||||||
|
|
||||||
if (serverToExclude != null){
|
if (serversToExclude != null){
|
||||||
destServers.remove(serverToExclude);
|
destServers.removeAll(serversToExclude);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Loop through the draining server list and remove them from the server list
|
// Loop through the draining server list and remove them from the server list
|
||||||
|
|
|
@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||||
|
@ -51,10 +54,10 @@ public class RegionServerTracker extends ZooKeeperListener {
|
||||||
private NavigableMap<ServerName, RegionServerInfo> regionServers =
|
private NavigableMap<ServerName, RegionServerInfo> regionServers =
|
||||||
new TreeMap<ServerName, RegionServerInfo>();
|
new TreeMap<ServerName, RegionServerInfo>();
|
||||||
private ServerManager serverManager;
|
private ServerManager serverManager;
|
||||||
private Server server;
|
private MasterServices server;
|
||||||
|
|
||||||
public RegionServerTracker(ZooKeeperWatcher watcher,
|
public RegionServerTracker(ZooKeeperWatcher watcher,
|
||||||
Server server, ServerManager serverManager) {
|
MasterServices server, ServerManager serverManager) {
|
||||||
super(watcher);
|
super(watcher);
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.serverManager = serverManager;
|
this.serverManager = serverManager;
|
||||||
|
@ -72,10 +75,10 @@ public class RegionServerTracker extends ZooKeeperListener {
|
||||||
watcher.registerListener(this);
|
watcher.registerListener(this);
|
||||||
List<String> servers =
|
List<String> servers =
|
||||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
|
ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
|
||||||
add(servers);
|
refresh(servers);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void add(final List<String> servers) throws IOException {
|
private void refresh(final List<String> servers) throws IOException {
|
||||||
synchronized(this.regionServers) {
|
synchronized(this.regionServers) {
|
||||||
this.regionServers.clear();
|
this.regionServers.clear();
|
||||||
for (String n: servers) {
|
for (String n: servers) {
|
||||||
|
@ -103,6 +106,10 @@ public class RegionServerTracker extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (server.isInitialized()) {
|
||||||
|
server.checkIfShouldMoveSystemRegionAsync();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void remove(final ServerName sn) {
|
private void remove(final ServerName sn) {
|
||||||
|
@ -135,7 +142,7 @@ public class RegionServerTracker extends ZooKeeperListener {
|
||||||
try {
|
try {
|
||||||
List<String> servers =
|
List<String> servers =
|
||||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
|
ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
|
||||||
add(servers);
|
refresh(servers);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
|
|
|
@ -109,6 +109,7 @@ public class TestDrainingServer {
|
||||||
Mockito.when(server.getConfiguration()).thenReturn(conf);
|
Mockito.when(server.getConfiguration()).thenReturn(conf);
|
||||||
Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1"));
|
Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1"));
|
||||||
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
|
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
|
||||||
|
Mockito.when(server.getRegionServerVersion(Mockito.any(ServerName.class))).thenReturn("0.0.0");
|
||||||
|
|
||||||
CoordinatedStateManager cp = new ZkCoordinatedStateManager();
|
CoordinatedStateManager cp = new ZkCoordinatedStateManager();
|
||||||
cp.initialize(server);
|
cp.initialize(server);
|
||||||
|
@ -124,6 +125,8 @@ public class TestDrainingServer {
|
||||||
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
Mockito.when(serverManager.createDestinationServersList(null))
|
Mockito.when(serverManager.createDestinationServersList(null))
|
||||||
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
|
Mockito.when(serverManager.createDestinationServersList(Mockito.anyList())).thenReturn(
|
||||||
|
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
|
|
||||||
for (ServerName sn : onlineServers.keySet()) {
|
for (ServerName sn : onlineServers.keySet()) {
|
||||||
Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
|
Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
|
||||||
|
@ -225,6 +228,8 @@ public class TestDrainingServer {
|
||||||
new ArrayList<ServerName>(onlineServers.keySet()));
|
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
|
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
|
||||||
new ArrayList<ServerName>(onlineServers.keySet()));
|
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
|
Mockito.when(serverManager.createDestinationServersList(Mockito.anyList())).thenReturn(
|
||||||
|
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
|
|
||||||
for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
|
for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
|
||||||
Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
|
Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
|
||||||
|
@ -284,6 +289,8 @@ public class TestDrainingServer {
|
||||||
List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
|
List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
|
||||||
Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers);
|
Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers);
|
||||||
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers);
|
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers);
|
||||||
|
Mockito.when(serverManager.createDestinationServersList(Mockito.anyList())).thenReturn(
|
||||||
|
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName,
|
private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName,
|
||||||
|
|
|
@ -239,6 +239,15 @@ public class MockNoopMasterServices implements MasterServices, Server {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRegionServerVersion(ServerName sn) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void checkIfShouldMoveSystemRegionAsync() {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Configuration getConfiguration() {
|
public Configuration getConfiguration() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -335,7 +335,6 @@ public class TestAssignmentListener {
|
||||||
// Now, we follow the same order of steps that the HMaster does to setup
|
// Now, we follow the same order of steps that the HMaster does to setup
|
||||||
// the ServerManager, RegionServerTracker, and DrainingServerTracker.
|
// the ServerManager, RegionServerTracker, and DrainingServerTracker.
|
||||||
ServerManager serverManager = new ServerManager(master, services);
|
ServerManager serverManager = new ServerManager(master, services);
|
||||||
|
|
||||||
RegionServerTracker regionServerTracker = new RegionServerTracker(
|
RegionServerTracker regionServerTracker = new RegionServerTracker(
|
||||||
zooKeeper, master, serverManager);
|
zooKeeper, master, serverManager);
|
||||||
regionServerTracker.start();
|
regionServerTracker.start();
|
||||||
|
|
|
@ -558,6 +558,15 @@ public class TestCatalogJanitor {
|
||||||
// Auto-generated method stub
|
// Auto-generated method stub
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRegionServerVersion(ServerName sn) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void checkIfShouldMoveSystemRegionAsync() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue