HBASE-17931 Assign system tables to servers with highest version
This commit is contained in:
parent
381a151d14
commit
1ead5f9103
|
@ -33,6 +33,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
public class VersionInfo {
|
||||
private static final Log LOG = LogFactory.getLog(VersionInfo.class.getName());
|
||||
|
||||
// If between two dots there is not a number, we regard it as a very large number so it is
|
||||
// higher than any numbers in the version.
|
||||
private static int VERY_LARGE_NUMBER = 100000;
|
||||
|
||||
/**
|
||||
* Get the hbase version.
|
||||
* @return the hbase version string, eg. "0.6.3-dev"
|
||||
|
@ -108,6 +112,45 @@ public class VersionInfo {
|
|||
}
|
||||
}
|
||||
|
||||
public static int compareVersion(String v1, String v2) {
|
||||
//fast compare equals first
|
||||
if (v1.equals(v2)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
String s1[] = v1.split("\\.|-");//1.2.3-hotfix -> [1, 2, 3, hotfix]
|
||||
String s2[] = v2.split("\\.|-");
|
||||
int index = 0;
|
||||
while (index < s1.length && index < s2.length) {
|
||||
int va = VERY_LARGE_NUMBER, vb = VERY_LARGE_NUMBER;
|
||||
try {
|
||||
va = Integer.parseInt(s1[index]);
|
||||
} catch (Exception ingore) {
|
||||
}
|
||||
try {
|
||||
vb = Integer.parseInt(s2[index]);
|
||||
} catch (Exception ingore) {
|
||||
}
|
||||
if (va != vb) {
|
||||
return va - vb;
|
||||
}
|
||||
if (va == VERY_LARGE_NUMBER) {
|
||||
// compare as String
|
||||
int c = s1[index].compareTo(s2[index]);
|
||||
if (c != 0) {
|
||||
return c;
|
||||
}
|
||||
}
|
||||
index++;
|
||||
}
|
||||
if (index < s1.length) {
|
||||
// s1 is longer
|
||||
return 1;
|
||||
}
|
||||
//s2 is longer
|
||||
return -1;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
writeTo(System.out);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestVersionInfo {
|
||||
|
||||
@Test
|
||||
public void testCompareVersion() {
|
||||
assertTrue(VersionInfo.compareVersion("1.0.0", "0.98.11") > 0);
|
||||
assertTrue(VersionInfo.compareVersion("0.98.11", "1.0.1") < 0);
|
||||
assertTrue(VersionInfo.compareVersion("2.0.0", "1.4.0") > 0);
|
||||
assertTrue(VersionInfo.compareVersion("2.0.0", "2.0.0-SNAPSHOT") < 0);
|
||||
}
|
||||
}
|
|
@ -884,6 +884,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Set master as 'initialized'.
|
||||
setInitialized(true);
|
||||
|
||||
assignmentManager.checkIfShouldMoveSystemRegionAsync();
|
||||
|
||||
status.setStatus("Assign meta replicas");
|
||||
metaBootstrap.assignMetaReplicas();
|
||||
|
||||
|
@ -1647,8 +1649,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Replace with an async implementation from which you can get
|
||||
// a success/failure result.
|
||||
@VisibleForTesting
|
||||
public void move(final byte[] encodedRegionName,
|
||||
final byte[] destServerName) throws HBaseIOException {
|
||||
public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException {
|
||||
RegionState regionState = assignmentManager.getRegionStates().
|
||||
getRegionState(Bytes.toString(encodedRegionName));
|
||||
|
||||
|
@ -1660,11 +1661,19 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
ServerName dest;
|
||||
List<ServerName> exclude = hri.isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
|
||||
: new ArrayList<>(1);
|
||||
if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
|
||||
LOG.info(
|
||||
Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
|
||||
+ " because the server is in exclude list");
|
||||
destServerName = null;
|
||||
}
|
||||
if (destServerName == null || destServerName.length == 0) {
|
||||
LOG.info("Passed destination servername is null/empty so " +
|
||||
"choosing a server at random");
|
||||
final List<ServerName> destServers = this.serverManager.createDestinationServersList(
|
||||
regionState.getServerName());
|
||||
exclude.add(regionState.getServerName());
|
||||
final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
|
||||
dest = balancer.randomAssignment(hri, destServers);
|
||||
if (dest == null) {
|
||||
LOG.debug("Unable to determine a plan to assign " + hri);
|
||||
|
@ -2575,7 +2584,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
if (info != null && info.hasVersionInfo()) {
|
||||
return info.getVersionInfo().getVersion();
|
||||
}
|
||||
return "Unknown";
|
||||
return "0.0.0"; //Lowest version to prevent move system region to unknown version RS.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkIfShouldMoveSystemRegionAsync() {
|
||||
assignmentManager.checkIfShouldMoveSystemRegionAsync();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -513,4 +513,8 @@ public interface MasterServices extends Server {
|
|||
* @return {@link LockManager} to lock namespaces/tables/regions.
|
||||
*/
|
||||
LockManager getLockManager();
|
||||
|
||||
public String getRegionServerVersion(final ServerName sn);
|
||||
|
||||
public void checkIfShouldMoveSystemRegionAsync();
|
||||
}
|
||||
|
|
|
@ -1206,13 +1206,13 @@ public class ServerManager {
|
|||
/**
|
||||
* Creates a list of possible destinations for a region. It contains the online servers, but not
|
||||
* the draining or dying servers.
|
||||
* @param serverToExclude can be null if there is no server to exclude
|
||||
* @param serversToExclude can be null if there is no server to exclude
|
||||
*/
|
||||
public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
|
||||
public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
|
||||
final List<ServerName> destServers = getOnlineServersList();
|
||||
|
||||
if (serverToExclude != null){
|
||||
destServers.remove(serverToExclude);
|
||||
if (serversToExclude != null){
|
||||
destServers.removeAll(serversToExclude);
|
||||
}
|
||||
|
||||
// Loop through the draining server list and remove them from the server list
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -86,8 +87,10 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* The AssignmentManager is the coordinator for region assign/unassign operations.
|
||||
|
@ -170,6 +173,8 @@ public class AssignmentManager implements ServerListener {
|
|||
private final int assignDispatchWaitMillis;
|
||||
private final int assignMaxAttempts;
|
||||
|
||||
private final Object checkIfShouldMoveSystemRegionLock = new Object();
|
||||
|
||||
private Thread assignThread;
|
||||
|
||||
public AssignmentManager(final MasterServices master) {
|
||||
|
@ -452,6 +457,55 @@ public class AssignmentManager implements ServerListener {
|
|||
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a new thread to check if there are region servers whose versions are higher than others.
|
||||
* If so, move all system table regions to RS with the highest version to keep compatibility.
|
||||
* The reason is, RS in new version may not be able to access RS in old version when there are
|
||||
* some incompatible changes.
|
||||
*/
|
||||
public void checkIfShouldMoveSystemRegionAsync() {
|
||||
new Thread(() -> {
|
||||
try {
|
||||
synchronized (checkIfShouldMoveSystemRegionLock) {
|
||||
List<ServerName> serverList = master.getServerManager()
|
||||
.createDestinationServersList(getExcludedServersForSystemTable());
|
||||
List<RegionPlan> plans = new ArrayList<>();
|
||||
for (ServerName server : getExcludedServersForSystemTable()) {
|
||||
List<HRegionInfo> regionsShouldMove = getCarryingSystemTables(server);
|
||||
if (!regionsShouldMove.isEmpty()) {
|
||||
for (HRegionInfo regionInfo : regionsShouldMove) {
|
||||
RegionPlan plan = new RegionPlan(regionInfo, server,
|
||||
getBalancer().randomAssignment(regionInfo, serverList));
|
||||
if (regionInfo.isMetaRegion()) {
|
||||
// Must move meta region first.
|
||||
moveAsync(plan);
|
||||
} else {
|
||||
plans.add(plan);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (RegionPlan plan : plans) {
|
||||
moveAsync(plan);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error(t);
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
private List<HRegionInfo> getCarryingSystemTables(ServerName serverName) {
|
||||
Set<RegionStateNode> regions = this.getRegionStates().getServerNode(serverName).getRegions();
|
||||
if (regions == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return regions.stream()
|
||||
.map(RegionStateNode::getRegionInfo)
|
||||
.filter(HRegionInfo::isSystemTable)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public void assign(final HRegionInfo regionInfo) throws IOException {
|
||||
assign(regionInfo, true);
|
||||
}
|
||||
|
@ -617,6 +671,19 @@ public class AssignmentManager implements ServerListener {
|
|||
}
|
||||
|
||||
public MoveRegionProcedure createMoveRegionProcedure(final RegionPlan plan) {
|
||||
if (plan.getRegionInfo().isSystemTable()) {
|
||||
List<ServerName> exclude = getExcludedServersForSystemTable();
|
||||
if (plan.getDestination() != null && exclude.contains(plan.getDestination())) {
|
||||
try {
|
||||
LOG.info("Can not move " + plan.getRegionInfo() + " to " + plan.getDestination()
|
||||
+ " because the server is not with highest version");
|
||||
plan.setDestination(getBalancer().randomAssignment(plan.getRegionInfo(),
|
||||
this.master.getServerManager().createDestinationServersList(exclude)));
|
||||
} catch (HBaseIOException e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new MoveRegionProcedure(getProcedureEnvironment(), plan);
|
||||
}
|
||||
|
||||
|
@ -1686,6 +1753,26 @@ public class AssignmentManager implements ServerListener {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of servers that this region can not assign to.
|
||||
* For system table, we must assign them to a server with highest version.
|
||||
*/
|
||||
public List<ServerName> getExcludedServersForSystemTable() {
|
||||
List<Pair<ServerName, String>> serverList = master.getServerManager().getOnlineServersList()
|
||||
.stream()
|
||||
.map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
|
||||
.collect(Collectors.toList());
|
||||
if (serverList.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
String highestVersion = Collections.max(serverList,
|
||||
(o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
|
||||
return serverList.stream()
|
||||
.filter((p)->!p.getSecond().equals(highestVersion))
|
||||
.map(Pair::getFirst)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// ============================================================================================
|
||||
// Server Helpers
|
||||
// ============================================================================================
|
||||
|
|
|
@ -27,9 +27,9 @@ import java.util.TreeMap;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||
|
@ -50,10 +50,10 @@ public class RegionServerTracker extends ZooKeeperListener {
|
|||
private static final Log LOG = LogFactory.getLog(RegionServerTracker.class);
|
||||
private NavigableMap<ServerName, RegionServerInfo> regionServers = new TreeMap<>();
|
||||
private ServerManager serverManager;
|
||||
private Server server;
|
||||
private MasterServices server;
|
||||
|
||||
public RegionServerTracker(ZooKeeperWatcher watcher,
|
||||
Server server, ServerManager serverManager) {
|
||||
MasterServices server, ServerManager serverManager) {
|
||||
super(watcher);
|
||||
this.server = server;
|
||||
this.serverManager = serverManager;
|
||||
|
@ -71,10 +71,10 @@ public class RegionServerTracker extends ZooKeeperListener {
|
|||
watcher.registerListener(this);
|
||||
List<String> servers =
|
||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.rsZNode);
|
||||
add(servers);
|
||||
refresh(servers);
|
||||
}
|
||||
|
||||
private void add(final List<String> servers) throws IOException {
|
||||
private void refresh(final List<String> servers) throws IOException {
|
||||
synchronized(this.regionServers) {
|
||||
this.regionServers.clear();
|
||||
for (String n: servers) {
|
||||
|
@ -102,6 +102,9 @@ public class RegionServerTracker extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (server.isInitialized()) {
|
||||
server.checkIfShouldMoveSystemRegionAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private void remove(final ServerName sn) {
|
||||
|
@ -134,7 +137,7 @@ public class RegionServerTracker extends ZooKeeperListener {
|
|||
try {
|
||||
List<String> servers =
|
||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.rsZNode);
|
||||
add(servers);
|
||||
refresh(servers);
|
||||
} catch (IOException e) {
|
||||
server.abort("Unexpected zk exception getting RS nodes", e);
|
||||
} catch (KeeperException e) {
|
||||
|
|
|
@ -442,6 +442,15 @@ public class MockNoopMasterServices implements MasterServices, Server {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRegionServerVersion(ServerName sn) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkIfShouldMoveSystemRegionAsync() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long dispatchMergingRegions(HRegionInfo region_a, HRegionInfo region_b, boolean forcible, long nonceGroup,
|
||||
long nonce) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue