HBASE-24795 : RegionMover to deal with unknown region while (un)loading
* RegionMover to ignore move failures for split/merged regions with ack mode * Refactor MoveWithAck and MoveWithoutAck as high level classes * UT for RegionMover gracefully handling split/merged regions while loading regions and throwing failure while loading offline regions Closes #2172 Signed-off-by: Sean Busbey <busbey@apache.org> Signed-off-by: Ted Yu <tyu@apache.org>
This commit is contained in:
parent
8e33bb04bc
commit
ad7caf754a
|
@ -150,6 +150,7 @@ public class AssignmentManager {
|
|||
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
|
||||
"hbase.metrics.rit.stuck.warning.threshold";
|
||||
private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
|
||||
public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
|
||||
|
||||
private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
|
||||
private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
|
||||
|
@ -588,7 +589,7 @@ public class AssignmentManager {
|
|||
throw new HBaseIOException(regionNode + " is currently in transition");
|
||||
}
|
||||
if (!regionNode.isInState(expectedStates)) {
|
||||
throw new DoNotRetryRegionException("Unexpected state for " + regionNode);
|
||||
throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
|
||||
}
|
||||
if (isTableDisabled(regionNode.getTable())) {
|
||||
throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* Move Regions and make sure that they are up on the target server.If a region movement fails we
|
||||
* exit as failure
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MoveWithAck implements Callable<Boolean> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MoveWithAck.class);
|
||||
|
||||
private final RegionInfo region;
|
||||
private final ServerName targetServer;
|
||||
private final List<RegionInfo> movedRegions;
|
||||
private final ServerName sourceServer;
|
||||
private final Connection conn;
|
||||
private final Admin admin;
|
||||
|
||||
MoveWithAck(Connection conn, RegionInfo regionInfo, ServerName sourceServer,
|
||||
ServerName targetServer, List<RegionInfo> movedRegions) throws IOException {
|
||||
this.conn = conn;
|
||||
this.region = regionInfo;
|
||||
this.targetServer = targetServer;
|
||||
this.movedRegions = movedRegions;
|
||||
this.sourceServer = sourceServer;
|
||||
this.admin = conn.getAdmin();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws IOException, InterruptedException {
|
||||
boolean moved = false;
|
||||
int count = 0;
|
||||
int retries = admin.getConfiguration()
|
||||
.getInt(RegionMover.MOVE_RETRIES_MAX_KEY, RegionMover.DEFAULT_MOVE_RETRIES_MAX);
|
||||
int maxWaitInSeconds = admin.getConfiguration()
|
||||
.getInt(RegionMover.MOVE_WAIT_MAX_KEY, RegionMover.DEFAULT_MOVE_WAIT_MAX);
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
boolean sameServer = true;
|
||||
// Assert we can scan the region in its current location
|
||||
isSuccessfulScan(region);
|
||||
LOG.info("Moving region: {} from {} to {}", region.getRegionNameAsString(), sourceServer,
|
||||
targetServer);
|
||||
while (count < retries && sameServer) {
|
||||
if (count > 0) {
|
||||
LOG.debug("Retry {} of maximum {} for region: {}", count, retries,
|
||||
region.getRegionNameAsString());
|
||||
}
|
||||
count = count + 1;
|
||||
admin.move(region.getEncodedNameAsBytes(), targetServer);
|
||||
long maxWait = startTime + (maxWaitInSeconds * 1000);
|
||||
while (EnvironmentEdgeManager.currentTime() < maxWait) {
|
||||
sameServer = isSameServer(region, sourceServer);
|
||||
if (!sameServer) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
if (sameServer) {
|
||||
LOG.error("Region: {} stuck on {} for {} sec , newServer={}", region.getRegionNameAsString(),
|
||||
this.sourceServer, getTimeDiffInSec(startTime), this.targetServer);
|
||||
} else {
|
||||
isSuccessfulScan(region);
|
||||
LOG.info("Moved Region {} , cost (sec): {}", region.getRegionNameAsString(),
|
||||
getTimeDiffInSec(startTime));
|
||||
moved = true;
|
||||
movedRegions.add(region);
|
||||
}
|
||||
return moved;
|
||||
}
|
||||
|
||||
private static String getTimeDiffInSec(long startTime) {
|
||||
return String.format("%.3f", (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to scan a row from passed region
|
||||
*/
|
||||
private void isSuccessfulScan(RegionInfo region) throws IOException {
|
||||
Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
|
||||
.setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
|
||||
.setCacheBlocks(false);
|
||||
try (Table table = conn.getTable(region.getTable());
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
scanner.next();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Could not scan region: {}", region.getEncodedName(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if passed region is still on serverName when we look at hbase:meta.
|
||||
* @return true if region is hosted on serverName otherwise false
|
||||
*/
|
||||
private boolean isSameServer(RegionInfo region, ServerName serverName)
|
||||
throws IOException {
|
||||
ServerName serverForRegion = getServerNameForRegion(region, admin, conn);
|
||||
return serverForRegion != null && serverForRegion.equals(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
|
||||
* startcode comma-delimited. Can return null
|
||||
* @return regionServer hosting the given region
|
||||
*/
|
||||
static ServerName getServerNameForRegion(RegionInfo region, Admin admin, Connection conn)
|
||||
throws IOException {
|
||||
if (!admin.isTableEnabled(region.getTable())) {
|
||||
return null;
|
||||
}
|
||||
HRegionLocation loc =
|
||||
conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(),
|
||||
region.getReplicaId(),true);
|
||||
if (loc != null) {
|
||||
return loc.getServerName();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
|
||||
* RS down anyways and not abort on a stuck region. Improves movement performance
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MoveWithoutAck implements Callable<Boolean> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MoveWithoutAck.class);
|
||||
|
||||
private final RegionInfo region;
|
||||
private final ServerName targetServer;
|
||||
private final List<RegionInfo> movedRegions;
|
||||
private final ServerName sourceServer;
|
||||
private final Admin admin;
|
||||
|
||||
MoveWithoutAck(Admin admin, RegionInfo regionInfo, ServerName sourceServer,
|
||||
ServerName targetServer, List<RegionInfo> movedRegions) {
|
||||
this.admin = admin;
|
||||
this.region = regionInfo;
|
||||
this.targetServer = targetServer;
|
||||
this.movedRegions = movedRegions;
|
||||
this.sourceServer = sourceServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() {
|
||||
try {
|
||||
LOG.info("Moving region: {} from {} to {}", region.getEncodedName(), sourceServer,
|
||||
targetServer);
|
||||
admin.move(region.getEncodedNameAsBytes(), targetServer);
|
||||
LOG.info("Requested move {} from {} to {}", region.getEncodedName(), sourceServer,
|
||||
targetServer);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error Moving Region: {}", region.getEncodedName(), e);
|
||||
} finally {
|
||||
movedRegions.add(region);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -39,7 +39,6 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -53,16 +52,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -259,105 +256,6 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Move Regions and make sure that they are up on the target server.If a region movement fails we
|
||||
* exit as failure
|
||||
*/
|
||||
private class MoveWithAck implements Callable<Boolean> {
|
||||
private RegionInfo region;
|
||||
private ServerName targetServer;
|
||||
private List<RegionInfo> movedRegions;
|
||||
private ServerName sourceServer;
|
||||
|
||||
public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
|
||||
ServerName targetServer, List<RegionInfo> movedRegions) {
|
||||
this.region = regionInfo;
|
||||
this.targetServer = targetServer;
|
||||
this.movedRegions = movedRegions;
|
||||
this.sourceServer = sourceServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws IOException, InterruptedException {
|
||||
boolean moved = false;
|
||||
int count = 0;
|
||||
int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
|
||||
int maxWaitInSeconds =
|
||||
admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
boolean sameServer = true;
|
||||
// Assert we can scan the region in its current location
|
||||
isSuccessfulScan(region);
|
||||
LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
|
||||
+ targetServer);
|
||||
while (count < retries && sameServer) {
|
||||
if (count > 0) {
|
||||
LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
|
||||
}
|
||||
count = count + 1;
|
||||
admin.move(region.getEncodedNameAsBytes(), targetServer);
|
||||
long maxWait = startTime + (maxWaitInSeconds * 1000);
|
||||
while (EnvironmentEdgeManager.currentTime() < maxWait) {
|
||||
sameServer = isSameServer(region, sourceServer);
|
||||
if (!sameServer) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
if (sameServer) {
|
||||
LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
|
||||
+ ",newServer=" + this.targetServer);
|
||||
} else {
|
||||
isSuccessfulScan(region);
|
||||
LOG.info("Moved Region "
|
||||
+ region.getRegionNameAsString()
|
||||
+ " cost:"
|
||||
+ String.format("%.3f",
|
||||
(float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
|
||||
moved = true;
|
||||
movedRegions.add(region);
|
||||
}
|
||||
return moved;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
|
||||
* RS down anyways and not abort on a stuck region. Improves movement performance
|
||||
*/
|
||||
private class MoveWithoutAck implements Callable<Boolean> {
|
||||
private RegionInfo region;
|
||||
private ServerName targetServer;
|
||||
private List<RegionInfo> movedRegions;
|
||||
private ServerName sourceServer;
|
||||
|
||||
public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
|
||||
ServerName targetServer, List<RegionInfo> movedRegions) {
|
||||
this.region = regionInfo;
|
||||
this.targetServer = targetServer;
|
||||
this.movedRegions = movedRegions;
|
||||
this.sourceServer = sourceServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() {
|
||||
try {
|
||||
LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
|
||||
+ targetServer);
|
||||
admin.move(region.getEncodedNameAsBytes(), targetServer);
|
||||
LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
|
||||
+ targetServer);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error Moving Region:" + region.getEncodedName(), e);
|
||||
} finally {
|
||||
// we add region to the moved regions list in No Ack Mode since this is best effort
|
||||
movedRegions.add(region);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
|
||||
* Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
|
||||
|
@ -390,12 +288,12 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
"Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
|
||||
+ " threads.Ack mode:" + this.ack);
|
||||
|
||||
ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
List<Future<Boolean>> taskList = new ArrayList<>();
|
||||
int counter = 0;
|
||||
while (counter < regionsToMove.size()) {
|
||||
RegionInfo region = regionsToMove.get(counter);
|
||||
ServerName currentServer = getServerNameForRegion(region);
|
||||
ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
|
||||
if (currentServer == null) {
|
||||
LOG.warn(
|
||||
"Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
|
||||
|
@ -408,12 +306,12 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
continue;
|
||||
}
|
||||
if (ack) {
|
||||
Future<Boolean> task =
|
||||
moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
|
||||
Future<Boolean> task = moveRegionsPool
|
||||
.submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
|
||||
taskList.add(task);
|
||||
} else {
|
||||
Future<Boolean> task =
|
||||
moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
|
||||
Future<Boolean> task = moveRegionsPool
|
||||
.submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
|
||||
taskList.add(task);
|
||||
}
|
||||
counter++;
|
||||
|
@ -512,26 +410,23 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
LOG.info("No Regions to move....Quitting now");
|
||||
break;
|
||||
}
|
||||
int counter = 0;
|
||||
LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
|
||||
+ regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
|
||||
+ ack);
|
||||
ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
|
||||
regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
|
||||
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
List<Future<Boolean>> taskList = new ArrayList<>();
|
||||
int serverIndex = 0;
|
||||
while (counter < regionsToMove.size()) {
|
||||
for (RegionInfo regionToMove : regionsToMove) {
|
||||
if (ack) {
|
||||
Future<Boolean> task = moveRegionsPool.submit(
|
||||
new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
|
||||
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
|
||||
movedRegions));
|
||||
taskList.add(task);
|
||||
} else {
|
||||
Future<Boolean> task = moveRegionsPool.submit(
|
||||
new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
|
||||
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
|
||||
movedRegions));
|
||||
taskList.add(task);
|
||||
}
|
||||
counter++;
|
||||
serverIndex = (serverIndex + 1) % regionServers.size();
|
||||
}
|
||||
moveRegionsPool.shutdown();
|
||||
|
@ -587,8 +482,13 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
|
||||
throw e;
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
|
||||
boolean ignoreFailure = ignoreRegionMoveFailure(e);
|
||||
if (ignoreFailure) {
|
||||
LOG.debug("Ignore region move failure, it might have been split/merged.", e);
|
||||
} else {
|
||||
LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
} catch (CancellationException e) {
|
||||
LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
|
||||
+ "secs", e);
|
||||
|
@ -597,6 +497,20 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean ignoreRegionMoveFailure(ExecutionException e) {
|
||||
boolean ignoreFailure = false;
|
||||
if (e.getCause() instanceof UnknownRegionException) {
|
||||
// region does not exist anymore
|
||||
ignoreFailure = true;
|
||||
} else if (e.getCause() instanceof DoNotRetryRegionException
|
||||
&& e.getCause().getMessage() != null && e.getCause().getMessage()
|
||||
.contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")) {
|
||||
// region is recently split
|
||||
ignoreFailure = true;
|
||||
}
|
||||
return ignoreFailure;
|
||||
}
|
||||
|
||||
private ServerName getTargetServer() throws Exception {
|
||||
ServerName server = null;
|
||||
int maxWaitInSeconds =
|
||||
|
@ -747,54 +661,6 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to scan a row from passed region
|
||||
*/
|
||||
private void isSuccessfulScan(RegionInfo region) throws IOException {
|
||||
Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
|
||||
.setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
|
||||
.setCacheBlocks(false);
|
||||
try (Table table = conn.getTable(region.getTable());
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
scanner.next();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Could not scan region:" + region.getEncodedName(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if passed region is still on serverName when we look at hbase:meta.
|
||||
* @return true if region is hosted on serverName otherwise false
|
||||
*/
|
||||
private boolean isSameServer(RegionInfo region, ServerName serverName)
|
||||
throws IOException {
|
||||
ServerName serverForRegion = getServerNameForRegion(region);
|
||||
if (serverForRegion != null && serverForRegion.equals(serverName)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
|
||||
* startcode comma-delimited. Can return null
|
||||
* @return regionServer hosting the given region
|
||||
*/
|
||||
private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
|
||||
if (!admin.isTableEnabled(region.getTable())) {
|
||||
return null;
|
||||
}
|
||||
HRegionLocation loc =
|
||||
conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(),
|
||||
region.getReplicaId(),true);
|
||||
if (loc != null) {
|
||||
return loc.getServerName();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addOptions() {
|
||||
this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
|
@ -44,13 +43,16 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.RegionMover.RegionMoverBuilder;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -59,13 +61,16 @@ import org.slf4j.LoggerFactory;
|
|||
* exclude functionality useful for rack decommissioning
|
||||
*/
|
||||
@Category({MiscTests.class, LargeTests.class})
|
||||
public class TestRegionMover {
|
||||
public class TestRegionMover1 {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionMover.class);
|
||||
HBaseClassTestRule.forClass(TestRegionMover1.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRegionMover.class);
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRegionMover1.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
|
@ -82,17 +87,19 @@ public class TestRegionMover {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Create a pre-split table just to populate some regions
|
||||
TableName tableName = TableName.valueOf("testRegionMover");
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
if (admin.tableExists(tableName)) {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("fam1")).build();
|
||||
String startKey = "a";
|
||||
String endKey = "z";
|
||||
admin.createTable(tableDesc, Bytes.toBytes(startKey), Bytes.toBytes(endKey), 9);
|
||||
TEST_UTIL.getAdmin().createTable(tableDesc, Bytes.toBytes(startKey), Bytes.toBytes(endKey), 9);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
TEST_UTIL.getAdmin().deleteTable(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -260,7 +267,7 @@ public class TestRegionMover {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRegionServerPort() {
|
||||
public void testRegionServerPort() throws Exception {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
String rsName = regionServer.getServerName().getHostname();
|
|
@ -0,0 +1,212 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Tests for Region Mover Load/Unload functionality with and without ack mode and also to test
|
||||
* exclude functionality useful for rack decommissioning
|
||||
*/
|
||||
@Category({ MiscTests.class, LargeTests.class})
|
||||
public class TestRegionMover2 {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionMover2.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRegionMover2.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("fam1")).build();
|
||||
int startKey = 0;
|
||||
int endKey = 80000;
|
||||
TEST_UTIL.getAdmin().createTable(tableDesc, Bytes.toBytes(startKey), Bytes.toBytes(endKey), 9);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
TEST_UTIL.getAdmin().deleteTable(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMergedRegions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
puts.add(new Put(Bytes.toBytes("rowkey_" + i))
|
||||
.addColumn(Bytes.toBytes("fam1"), Bytes.toBytes("q1"), Bytes.toBytes("val_" + i)));
|
||||
}
|
||||
table.put(puts);
|
||||
admin.flush(tableName);
|
||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
String rsName = regionServer.getServerName().getAddress().toString();
|
||||
int numRegions = regionServer.getNumberOfOnlineRegions();
|
||||
List<HRegion> hRegions = regionServer.getRegions().stream()
|
||||
.filter(hRegion -> hRegion.getRegionInfo().getTable().equals(tableName))
|
||||
.collect(Collectors.toList());
|
||||
RegionMover.RegionMoverBuilder rmBuilder =
|
||||
new RegionMover.RegionMoverBuilder(rsName, TEST_UTIL.getConfiguration()).ack(true)
|
||||
.maxthreads(8);
|
||||
try (RegionMover rm = rmBuilder.build()) {
|
||||
LOG.debug("Unloading {}", regionServer.getServerName());
|
||||
rm.unload();
|
||||
Assert.assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
||||
LOG.debug("Successfully Unloaded, now Loading");
|
||||
admin.mergeRegionsAsync(new byte[][] { hRegions.get(0).getRegionInfo().getRegionName(),
|
||||
hRegions.get(1).getRegionInfo().getRegionName() }, true)
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(rm.load());
|
||||
Assert.assertEquals(numRegions - 2, regionServer.getNumberOfOnlineRegions());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithSplitRegions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int i = 10; i < 50000; i++) {
|
||||
puts.add(new Put(Bytes.toBytes(i))
|
||||
.addColumn(Bytes.toBytes("fam1"), Bytes.toBytes("q1"), Bytes.toBytes("val_" + i)));
|
||||
}
|
||||
table.put(puts);
|
||||
admin.flush(tableName);
|
||||
admin.compact(tableName);
|
||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
String rsName = regionServer.getServerName().getAddress().toString();
|
||||
int numRegions = regionServer.getNumberOfOnlineRegions();
|
||||
List<HRegion> hRegions = regionServer.getRegions().stream()
|
||||
.filter(hRegion -> hRegion.getRegionInfo().getTable().equals(tableName))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
RegionMover.RegionMoverBuilder rmBuilder =
|
||||
new RegionMover.RegionMoverBuilder(rsName, TEST_UTIL.getConfiguration()).ack(true)
|
||||
.maxthreads(8);
|
||||
try (RegionMover rm = rmBuilder.build()) {
|
||||
LOG.debug("Unloading {}", regionServer.getServerName());
|
||||
rm.unload();
|
||||
Assert.assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
||||
LOG.debug("Successfully Unloaded, now Loading");
|
||||
HRegion hRegion = hRegions.get(1);
|
||||
int startKey = 0;
|
||||
int endKey = Integer.MAX_VALUE;
|
||||
if (hRegion.getRegionInfo().getStartKey().length > 0) {
|
||||
startKey = Bytes.toInt(hRegion.getRegionInfo().getStartKey());
|
||||
}
|
||||
if (hRegion.getRegionInfo().getEndKey().length > 0) {
|
||||
endKey = Bytes.toInt(hRegion.getRegionInfo().getEndKey());
|
||||
}
|
||||
int midKey = startKey + (endKey - startKey) / 2;
|
||||
admin.splitRegionAsync(hRegion.getRegionInfo().getRegionName(), Bytes.toBytes(midKey))
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(rm.load());
|
||||
Assert.assertEquals(numRegions - 1, regionServer.getNumberOfOnlineRegions());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedRegionMove() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
Table table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
puts.add(new Put(Bytes.toBytes("rowkey_" + i))
|
||||
.addColumn(Bytes.toBytes("fam1"), Bytes.toBytes("q1"), Bytes.toBytes("val_" + i)));
|
||||
}
|
||||
table.put(puts);
|
||||
admin.flush(tableName);
|
||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
||||
String rsName = regionServer.getServerName().getAddress().toString();
|
||||
int numRegions = regionServer.getNumberOfOnlineRegions();
|
||||
List<HRegion> hRegions = regionServer.getRegions().stream()
|
||||
.filter(hRegion -> hRegion.getRegionInfo().getTable().equals(tableName))
|
||||
.collect(Collectors.toList());
|
||||
RegionMover.RegionMoverBuilder rmBuilder =
|
||||
new RegionMover.RegionMoverBuilder(rsName, TEST_UTIL.getConfiguration()).ack(true)
|
||||
.maxthreads(8);
|
||||
try (RegionMover rm = rmBuilder.build()) {
|
||||
LOG.debug("Unloading {}", regionServer.getServerName());
|
||||
rm.unload();
|
||||
Assert.assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
||||
LOG.debug("Successfully Unloaded, now Loading");
|
||||
admin.offline(hRegions.get(0).getRegionInfo().getRegionName());
|
||||
// loading regions will fail because of offline region
|
||||
Assert.assertFalse(rm.load());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue