HBASE-5877 When a query fails because the region has moved, let the regionserver return the new address to the client (N Keywal)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1336301 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-05-09 17:07:49 +00:00
parent fc6a9cf796
commit 5edfc1996d
15 changed files with 824 additions and 123 deletions

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.RemoteException;
/**
* Subclass if the server knows the region is now on another server.
* This allows the client to call the new region server without calling the master.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RegionMovedException extends NotServingRegionException {
private static final Log LOG = LogFactory.getLog(RegionMovedException.class);
private static final long serialVersionUID = -7232903522310558397L;
private final String hostname;
private final int port;
private static final String HOST_FIELD = "hostname=";
private static final String PORT_FIELD = "port=";
public RegionMovedException(final String hostname, final int port) {
super();
this.hostname = hostname;
this.port = port;
}
public String getHostname() {
return hostname;
}
public int getPort() {
return port;
}
/**
* For hadoop.ipc internal call. Do NOT use.
* We have to parse the hostname to recreate the exception.
* The input is the one generated by {@link #getMessage()}
*/
public RegionMovedException(String s) {
int posHostname = s.indexOf(HOST_FIELD) + HOST_FIELD.length();
int posPort = s.indexOf(PORT_FIELD) + PORT_FIELD.length();
String tmpHostname = null;
int tmpPort = -1;
try {
tmpHostname = s.substring(posHostname, s.indexOf(' ', posHostname));
tmpPort = Integer.parseInt(s.substring(posPort));
} catch (Exception ignored) {
LOG.warn("Can't parse the hostname and the port from this string: " + s + ", "+
"Continuing");
}
hostname = tmpHostname;
port = tmpPort;
}
@Override
public String getMessage() {
return "Region moved to: " + HOST_FIELD + hostname + " " + PORT_FIELD + port;
}
/**
* Look for a RegionMovedException in the exception:
* - hadoop.ipc wrapped exceptions
* - nested exceptions
* Returns null if we didn't find the exception or if it was not readable.
*/
public static RegionMovedException find(Object exception) {
if (exception == null || !(exception instanceof Throwable)){
return null;
}
Throwable cur = (Throwable)exception;
RegionMovedException res = null;
while (res == null && cur != null) {
if (cur instanceof RegionMovedException) {
res = (RegionMovedException) cur;
} else {
if (cur instanceof RemoteException) {
RemoteException re = (RemoteException) cur;
Exception e = re.unwrapRemoteException(RegionMovedException.class);
if (e == null){
e = re.unwrapRemoteException();
}
// unwrapRemoteException can return the exception given as a parameter when it cannot
// unwrap it. In this case, there is no need to look further
// noinspection ObjectEquality
if (e != re){
res = find(e);
}
}
cur = cur.getCause();
}
}
if (res != null && (res.getPort() < 0 || res.getHostname() == null)){
// We failed to parse the exception. Let's act as we don't find the exception.
return null;
} else {
return res;
}
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionMovedException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@ -1716,6 +1717,81 @@ public class HConnectionManager {
};
}
void updateCachedLocation(HRegionLocation hrl, String hostname, int port) {
HRegionLocation newHrl = new HRegionLocation(hrl.getRegionInfo(), hostname, port);
synchronized (this.cachedRegionLocations) {
cacheLocation(hrl.getRegionInfo().getTableName(), newHrl);
}
}
void deleteCachedLocation(HRegionLocation rl) {
synchronized (this.cachedRegionLocations) {
Map<byte[], HRegionLocation> tableLocations =
getTableLocations(rl.getRegionInfo().getTableName());
tableLocations.remove(rl.getRegionInfo().getStartKey());
}
}
private void updateCachedLocations(
UpdateHistory updateHistory, HRegionLocation hrl, Object t) {
updateCachedLocations(updateHistory, hrl, null, null, t);
}
private void updateCachedLocations(UpdateHistory updateHistory, byte[] tableName, Row row,
Object t) {
updateCachedLocations(updateHistory, null, tableName, row, t);
}
/**
* Update the location with the new value (if the exception is a RegionMovedException) or delete
* it from the cache.
* We need to keep an history of the modifications, because we can have first an update then a
* delete. The delete would remove the update.
* @param updateHistory - The set used for the history
* @param hrl - can be null. If it's the case, tableName and row should not be null
* @param tableName - can be null if hrl is not null.
* @param row - can be null if hrl is not null.
* @param exception - An object (to simplify user code) on which we will try to find a nested
* or wrapped or both RegionMovedException
*/
private void updateCachedLocations(
UpdateHistory updateHistory, final HRegionLocation hrl, final byte[] tableName,
Row row, final Object exception) {
if ((row == null || tableName == null) && hrl == null){
LOG.warn ("Coding error, see method javadoc. row="+row+", tableName="+
Bytes.toString(tableName)+", hrl="+hrl);
return;
}
// Is it something we have already updated?
final HRegionLocation myLoc = (hrl != null ?
hrl : getCachedLocation(tableName, row.getRow()));
if (myLoc == null) {
// There is no such location in the cache => it's been removed already => nothing to do
return;
}
final String regionName = myLoc.getRegionInfo().getRegionNameAsString();
if (updateHistory.contains(regionName)) {
// Already updated/deleted => nothing to do
return;
}
updateHistory.add(regionName);
final RegionMovedException rme = RegionMovedException.find(exception);
if (rme != null) {
LOG.info("Region " + myLoc.getRegionInfo().getRegionNameAsString() + " moved from " +
myLoc.getHostnamePort() + ", updating client location cache." +
" New server: " + rme.getHostname() + ":" + rme.getPort());
updateCachedLocation(myLoc, rme.getHostname(), rme.getPort());
} else {
deleteCachedLocation(myLoc);
}
}
@Override
@Deprecated
public void processBatch(List<? extends Row> list,
@ -1799,6 +1875,19 @@ public class HConnectionManager {
}
}
private static class UpdateHistory{
private final Set<String> updateHistory = new HashSet<String>(100); // size: if we're doing a
// rolling restart we may have 100 regions with a wrong location if we're really unlucky
public boolean contains(String regionName) {
return updateHistory.contains(regionName);
}
public void add(String regionName) {
updateHistory.add(regionName);
}
}
/**
* Parameterized batch processing, allowing varying return types for
* different {@link Row} implementations.
@ -1833,6 +1922,7 @@ public class HConnectionManager {
int actionCount = 0;
for (int tries = 0; tries < numRetries && retry; ++tries) {
UpdateHistory updateHistory = new UpdateHistory();
// sleep first, if this is a retry
if (tries >= 1) {
@ -1910,6 +2000,7 @@ public class HConnectionManager {
}
} catch (ExecutionException e) {
LOG.debug("Failed all from " + loc, e);
updateCachedLocations(updateHistory, loc, e);
}
}
@ -1931,7 +2022,7 @@ public class HConnectionManager {
actionCount++;
Row row = list.get(i);
workingList.add(row);
deleteCachedLocation(tableName, row.getRow());
updateCachedLocations(updateHistory, tableName, row, results[i]);
} else {
if (results[i] != null && results[i] instanceof Throwable) {
actionCount++;

View File

@ -1943,7 +1943,7 @@ public class AssignmentManager extends ZooKeeperListener {
* @param region server to be unassigned
* @param force if region should be closed even if already closing
*/
public void unassign(HRegionInfo region, boolean force) {
public void unassign(HRegionInfo region, boolean force, ServerName dest) {
// TODO: Method needs refactoring. Ugly buried returns throughout. Beware!
LOG.debug("Starting unassignment of region " +
region.getRegionNameAsString() + " (offlining)");
@ -2045,7 +2045,7 @@ public class AssignmentManager extends ZooKeeperListener {
// TODO: We should consider making this look more like it does for the
// region open where we catch all throwables and never abort
if (serverManager.sendRegionClose(server, state.getRegion(),
versionOfClosingNode)) {
versionOfClosingNode, dest)) {
LOG.debug("Sent CLOSE to " + server + " for region " +
region.getRegionNameAsString());
return;
@ -2086,11 +2086,15 @@ public class AssignmentManager extends ZooKeeperListener {
state.update(state.getState());
}
LOG.info("Server " + server + " returned " + t + " for " +
region.getRegionNameAsString());
region.getRegionNameAsString(), t);
// Presume retry or server will expire.
}
}
public void unassign(HRegionInfo region, boolean force){
unassign(region, force, null);
}
private void deleteClosingOrClosedNode(HRegionInfo region) {
try {
if (!ZKAssign.deleteNode(master.getZooKeeper(), region.getEncodedName(),
@ -3228,7 +3232,7 @@ public class AssignmentManager extends ZooKeeperListener {
synchronized (this.regionPlans) {
this.regionPlans.put(plan.getRegionName(), plan);
}
unassign(plan.getRegionInfo());
unassign(plan.getRegionInfo(), false, plan.getDestination());
}
/**

View File

@ -528,11 +528,12 @@ public class ServerManager {
* @param versionOfClosingNode
* the version of znode to compare when RS transitions the znode from
* CLOSING state.
* @param dest - if the region is moved to another server, the destination server. null otherwise.
* @return true if server acknowledged close, false if not
* @throws IOException
*/
public boolean sendRegionClose(ServerName server, HRegionInfo region,
int versionOfClosingNode) throws IOException {
int versionOfClosingNode, ServerName dest) throws IOException {
if (server == null) throw new NullPointerException("Passed server is null");
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
@ -542,7 +543,12 @@ public class ServerManager {
" failed because no RPC connection found to this server");
}
return ProtobufUtil.closeRegion(admin, region.getRegionName(),
versionOfClosingNode);
versionOfClosingNode, dest);
}
public boolean sendRegionClose(ServerName server, HRegionInfo region,
int versionOfClosingNode) throws IOException {
return sendRegionClose(server, region, versionOfClosingNode, null);
}
/**

View File

@ -1282,6 +1282,29 @@ public final class ProtobufUtil {
}
}
/**
* A helper to close a region given a region name
* using admin protocol.
*
* @param admin
* @param regionName
* @param versionOfClosingNode
* @return true if the region is closed
* @throws IOException
*/
public static boolean closeRegion(final AdminProtocol admin, final byte[] regionName,
final int versionOfClosingNode, final ServerName destinationServer) throws IOException {
CloseRegionRequest closeRegionRequest =
RequestConverter.buildCloseRegionRequest(regionName, versionOfClosingNode, destinationServer);
try {
CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest);
return ResponseConverter.isClosed(response);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to open a region using admin protocol.
*

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@ -643,6 +644,19 @@ public final class RequestConverter {
return builder.build();
}
public static CloseRegionRequest buildCloseRegionRequest(
final byte[] regionName, final int versionOfClosingNode, ServerName destinationServer) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setVersionOfClosingNode(versionOfClosingNode);
if (destinationServer != null){
builder.setDestinationServer(ProtobufUtil.toServerName( destinationServer) );
}
return builder.build();
}
/**
* Create a CloseRegionRequest for a given encoded region name
*

View File

@ -4031,6 +4031,11 @@ public final class AdminProtos {
// optional bool transitionInZK = 3 [default = true];
boolean hasTransitionInZK();
boolean getTransitionInZK();
// optional .ServerName destinationServer = 4;
boolean hasDestinationServer();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getDestinationServer();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getDestinationServerOrBuilder();
}
public static final class CloseRegionRequest extends
com.google.protobuf.GeneratedMessage
@ -4094,10 +4099,24 @@ public final class AdminProtos {
return transitionInZK_;
}
// optional .ServerName destinationServer = 4;
public static final int DESTINATIONSERVER_FIELD_NUMBER = 4;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName destinationServer_;
public boolean hasDestinationServer() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getDestinationServer() {
return destinationServer_;
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getDestinationServerOrBuilder() {
return destinationServer_;
}
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
versionOfClosingNode_ = 0;
transitionInZK_ = true;
destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -4112,6 +4131,12 @@ public final class AdminProtos {
memoizedIsInitialized = 0;
return false;
}
if (hasDestinationServer()) {
if (!getDestinationServer().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
@ -4128,6 +4153,9 @@ public final class AdminProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, transitionInZK_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeMessage(4, destinationServer_);
}
getUnknownFields().writeTo(output);
}
@ -4149,6 +4177,10 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, transitionInZK_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(4, destinationServer_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -4187,6 +4219,11 @@ public final class AdminProtos {
result = result && (getTransitionInZK()
== other.getTransitionInZK());
}
result = result && (hasDestinationServer() == other.hasDestinationServer());
if (hasDestinationServer()) {
result = result && getDestinationServer()
.equals(other.getDestinationServer());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -4208,6 +4245,10 @@ public final class AdminProtos {
hash = (37 * hash) + TRANSITIONINZK_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getTransitionInZK());
}
if (hasDestinationServer()) {
hash = (37 * hash) + DESTINATIONSERVER_FIELD_NUMBER;
hash = (53 * hash) + getDestinationServer().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@ -4317,6 +4358,7 @@ public final class AdminProtos {
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getRegionFieldBuilder();
getDestinationServerFieldBuilder();
}
}
private static Builder create() {
@ -4335,6 +4377,12 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000002);
transitionInZK_ = true;
bitField0_ = (bitField0_ & ~0x00000004);
if (destinationServerBuilder_ == null) {
destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
} else {
destinationServerBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@ -4389,6 +4437,14 @@ public final class AdminProtos {
to_bitField0_ |= 0x00000004;
}
result.transitionInZK_ = transitionInZK_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
if (destinationServerBuilder_ == null) {
result.destinationServer_ = destinationServer_;
} else {
result.destinationServer_ = destinationServerBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -4414,6 +4470,9 @@ public final class AdminProtos {
if (other.hasTransitionInZK()) {
setTransitionInZK(other.getTransitionInZK());
}
if (other.hasDestinationServer()) {
mergeDestinationServer(other.getDestinationServer());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -4427,6 +4486,12 @@ public final class AdminProtos {
return false;
}
if (hasDestinationServer()) {
if (!getDestinationServer().isInitialized()) {
return false;
}
}
return true;
}
@ -4472,6 +4537,15 @@ public final class AdminProtos {
transitionInZK_ = input.readBool();
break;
}
case 34: {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
if (hasDestinationServer()) {
subBuilder.mergeFrom(getDestinationServer());
}
input.readMessage(subBuilder, extensionRegistry);
setDestinationServer(subBuilder.buildPartial());
break;
}
}
}
}
@ -4610,6 +4684,96 @@ public final class AdminProtos {
return this;
}
// optional .ServerName destinationServer = 4;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> destinationServerBuilder_;
public boolean hasDestinationServer() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getDestinationServer() {
if (destinationServerBuilder_ == null) {
return destinationServer_;
} else {
return destinationServerBuilder_.getMessage();
}
}
public Builder setDestinationServer(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (destinationServerBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
destinationServer_ = value;
onChanged();
} else {
destinationServerBuilder_.setMessage(value);
}
bitField0_ |= 0x00000008;
return this;
}
public Builder setDestinationServer(
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) {
if (destinationServerBuilder_ == null) {
destinationServer_ = builderForValue.build();
onChanged();
} else {
destinationServerBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000008;
return this;
}
public Builder mergeDestinationServer(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (destinationServerBuilder_ == null) {
if (((bitField0_ & 0x00000008) == 0x00000008) &&
destinationServer_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
destinationServer_ =
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(destinationServer_).mergeFrom(value).buildPartial();
} else {
destinationServer_ = value;
}
onChanged();
} else {
destinationServerBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000008;
return this;
}
public Builder clearDestinationServer() {
if (destinationServerBuilder_ == null) {
destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
onChanged();
} else {
destinationServerBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getDestinationServerBuilder() {
bitField0_ |= 0x00000008;
onChanged();
return getDestinationServerFieldBuilder().getBuilder();
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getDestinationServerOrBuilder() {
if (destinationServerBuilder_ != null) {
return destinationServerBuilder_.getMessageOrBuilder();
} else {
return destinationServer_;
}
}
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>
getDestinationServerFieldBuilder() {
if (destinationServerBuilder_ == null) {
destinationServerBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>(
destinationServer_,
getParentForChildren(),
isClean());
destinationServer_ = null;
}
return destinationServerBuilder_;
}
// @@protoc_insertion_point(builder_scope:CloseRegionRequest)
}
@ -15511,62 +15675,63 @@ public final class AdminProtos {
"esponse\022<\n\014openingState\030\001 \003(\0162&.OpenRegi" +
"onResponse.RegionOpeningState\"H\n\022RegionO" +
"peningState\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENE" +
"D\020\001\022\022\n\016FAILED_OPENING\020\002\"r\n\022CloseRegionRe" +
"quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
"\034\n\024versionOfClosingNode\030\002 \001(\r\022\034\n\016transit" +
"ionInZK\030\003 \001(\010:\004true\"%\n\023CloseRegionRespon" +
"se\022\016\n\006closed\030\001 \002(\010\"M\n\022FlushRegionRequest" +
"\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\025\n\rif",
"OlderThanTs\030\002 \001(\004\"=\n\023FlushRegionResponse" +
"\022\025\n\rlastFlushTime\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010" +
"\"J\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020" +
".RegionSpecifier\022\022\n\nsplitPoint\030\002 \001(\014\"\025\n\023" +
"SplitRegionResponse\"G\n\024CompactRegionRequ" +
"est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r\n" +
"\005major\030\002 \001(\010\"\027\n\025CompactRegionResponse\"1\n" +
"\004UUID\022\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013mostSigBi" +
"ts\030\002 \002(\004\"\270\003\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.WAL" +
"Entry.WALKey\022\037\n\004edit\030\002 \002(\0132\021.WALEntry.WA",
"LEdit\032~\n\006WALKey\022\031\n\021encodedRegionName\030\001 \002" +
"(\014\022\021\n\ttableName\030\002 \002(\014\022\031\n\021logSequenceNumb" +
"er\030\003 \002(\004\022\021\n\twriteTime\030\004 \002(\004\022\030\n\tclusterId" +
"\030\005 \001(\0132\005.UUID\032\353\001\n\007WALEdit\022\025\n\rkeyValueByt" +
"es\030\001 \003(\014\0222\n\013familyScope\030\002 \003(\0132\035.WALEntry" +
".WALEdit.FamilyScope\032M\n\013FamilyScope\022\016\n\006f" +
"amily\030\001 \002(\014\022.\n\tscopeType\030\002 \002(\0162\033.WALEntr" +
"y.WALEdit.ScopeType\"F\n\tScopeType\022\033\n\027REPL" +
"ICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCO" +
"PE_GLOBAL\020\001\"4\n\030ReplicateWALEntryRequest\022",
"\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWA" +
"LEntryResponse\"\026\n\024RollWALWriterRequest\"." +
"\n\025RollWALWriterResponse\022\025\n\rregionToFlush" +
"\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " +
"\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn" +
"foRequest\"@\n\nServerInfo\022\037\n\nserverName\030\001 " +
"\002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(\r\"8\n\025G" +
"etServerInfoResponse\022\037\n\nserverInfo\030\001 \002(\013" +
"2\013.ServerInfo2\371\005\n\014AdminService\022>\n\rgetReg" +
"ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi",
"onInfoResponse\022;\n\014getStoreFile\022\024.GetStor" +
"eFileRequest\032\025.GetStoreFileResponse\022D\n\017g" +
"etOnlineRegion\022\027.GetOnlineRegionRequest\032" +
"\030.GetOnlineRegionResponse\0225\n\nopenRegion\022" +
"\022.OpenRegionRequest\032\023.OpenRegionResponse" +
"\0228\n\013closeRegion\022\023.CloseRegionRequest\032\024.C" +
"loseRegionResponse\0228\n\013flushRegion\022\023.Flus" +
"hRegionRequest\032\024.FlushRegionResponse\0228\n\013" +
"splitRegion\022\023.SplitRegionRequest\032\024.Split" +
"RegionResponse\022>\n\rcompactRegion\022\025.Compac",
"tRegionRequest\032\026.CompactRegionResponse\022J" +
"\n\021replicateWALEntry\022\031.ReplicateWALEntryR" +
"equest\032\032.ReplicateWALEntryResponse\022>\n\rro" +
"llWALWriter\022\025.RollWALWriterRequest\032\026.Rol" +
"lWALWriterResponse\022>\n\rgetServerInfo\022\025.Ge" +
"tServerInfoRequest\032\026.GetServerInfoRespon" +
"se\0225\n\nstopServer\022\022.StopServerRequest\032\023.S" +
"topServerResponseBA\n*org.apache.hadoop.h" +
"base.protobuf.generatedB\013AdminProtosH\001\210\001" +
"\001\240\001\001"
"D\020\001\022\022\n\016FAILED_OPENING\020\002\"\232\001\n\022CloseRegionR" +
"equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
"\022\034\n\024versionOfClosingNode\030\002 \001(\r\022\034\n\016transi" +
"tionInZK\030\003 \001(\010:\004true\022&\n\021destinationServe" +
"r\030\004 \001(\0132\013.ServerName\"%\n\023CloseRegionRespo" +
"nse\022\016\n\006closed\030\001 \002(\010\"M\n\022FlushRegionReques",
"t\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\025\n\ri" +
"fOlderThanTs\030\002 \001(\004\"=\n\023FlushRegionRespons" +
"e\022\025\n\rlastFlushTime\030\001 \002(\004\022\017\n\007flushed\030\002 \001(" +
"\010\"J\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132" +
"\020.RegionSpecifier\022\022\n\nsplitPoint\030\002 \001(\014\"\025\n" +
"\023SplitRegionResponse\"G\n\024CompactRegionReq" +
"uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r" +
"\n\005major\030\002 \001(\010\"\027\n\025CompactRegionResponse\"1" +
"\n\004UUID\022\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013mostSigB" +
"its\030\002 \002(\004\"\270\003\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.WA",
"LEntry.WALKey\022\037\n\004edit\030\002 \002(\0132\021.WALEntry.W" +
"ALEdit\032~\n\006WALKey\022\031\n\021encodedRegionName\030\001 " +
"\002(\014\022\021\n\ttableName\030\002 \002(\014\022\031\n\021logSequenceNum" +
"ber\030\003 \002(\004\022\021\n\twriteTime\030\004 \002(\004\022\030\n\tclusterI" +
"d\030\005 \001(\0132\005.UUID\032\353\001\n\007WALEdit\022\025\n\rkeyValueBy" +
"tes\030\001 \003(\014\0222\n\013familyScope\030\002 \003(\0132\035.WALEntr" +
"y.WALEdit.FamilyScope\032M\n\013FamilyScope\022\016\n\006" +
"family\030\001 \002(\014\022.\n\tscopeType\030\002 \002(\0162\033.WALEnt" +
"ry.WALEdit.ScopeType\"F\n\tScopeType\022\033\n\027REP" +
"LICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SC",
"OPE_GLOBAL\020\001\"4\n\030ReplicateWALEntryRequest" +
"\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateW" +
"ALEntryResponse\"\026\n\024RollWALWriterRequest\"" +
".\n\025RollWALWriterResponse\022\025\n\rregionToFlus" +
"h\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001" +
" \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerI" +
"nfoRequest\"@\n\nServerInfo\022\037\n\nserverName\030\001" +
" \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(\r\"8\n\025" +
"GetServerInfoResponse\022\037\n\nserverInfo\030\001 \002(" +
"\0132\013.ServerInfo2\371\005\n\014AdminService\022>\n\rgetRe",
"gionInfo\022\025.GetRegionInfoRequest\032\026.GetReg" +
"ionInfoResponse\022;\n\014getStoreFile\022\024.GetSto" +
"reFileRequest\032\025.GetStoreFileResponse\022D\n\017" +
"getOnlineRegion\022\027.GetOnlineRegionRequest" +
"\032\030.GetOnlineRegionResponse\0225\n\nopenRegion" +
"\022\022.OpenRegionRequest\032\023.OpenRegionRespons" +
"e\0228\n\013closeRegion\022\023.CloseRegionRequest\032\024." +
"CloseRegionResponse\0228\n\013flushRegion\022\023.Flu" +
"shRegionRequest\032\024.FlushRegionResponse\0228\n" +
"\013splitRegion\022\023.SplitRegionRequest\032\024.Spli",
"tRegionResponse\022>\n\rcompactRegion\022\025.Compa" +
"ctRegionRequest\032\026.CompactRegionResponse\022" +
"J\n\021replicateWALEntry\022\031.ReplicateWALEntry" +
"Request\032\032.ReplicateWALEntryResponse\022>\n\rr" +
"ollWALWriter\022\025.RollWALWriterRequest\032\026.Ro" +
"llWALWriterResponse\022>\n\rgetServerInfo\022\025.G" +
"etServerInfoRequest\032\026.GetServerInfoRespo" +
"nse\0225\n\nstopServer\022\022.StopServerRequest\032\023." +
"StopServerResponseBA\n*org.apache.hadoop." +
"hbase.protobuf.generatedB\013AdminProtosH\001\210",
"\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -15642,7 +15807,7 @@ public final class AdminProtos {
internal_static_CloseRegionRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CloseRegionRequest_descriptor,
new java.lang.String[] { "Region", "VersionOfClosingNode", "TransitionInZK", },
new java.lang.String[] { "Region", "VersionOfClosingNode", "TransitionInZK", "DestinationServer", },
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest.class,
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest.Builder.class);
internal_static_CloseRegionResponse_descriptor =

View File

@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionMovedException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@ -416,6 +418,12 @@ public class HRegionServer implements ClientProtocol,
*/
private ObjectName mxBean = null;
/**
* Chore to clean periodically the moved region list
*/
private MovedRegionsCleaner movedRegionsCleaner;
/**
* Starts a HRegionServer at the default location
*
@ -709,6 +717,9 @@ public class HRegionServer implements ClientProtocol,
thriftServer.start();
LOG.info("Started Thrift API from Region Server.");
}
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
}
/**
@ -805,6 +816,8 @@ public class HRegionServer implements ClientProtocol,
cacheConfig.getBlockCache().shutdown();
}
movedRegionsCleaner.stop("Region Server stopping");
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
@ -2048,21 +2061,6 @@ public class HRegionServer implements ClientProtocol,
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
}
@Override
public boolean removeFromOnlineRegions(final String encodedName) {
HRegion toReturn = null;
toReturn = this.onlineRegions.remove(encodedName);
//Clear all of the dynamic metrics as they are now probably useless.
//This is a clear because dynamic metrics could include metrics per cf and
//per hfile. Figuring out which cfs, hfiles, and regions are still relevant to
//this region server would be an onerous task. Instead just clear everything
//and on the next tick of the metrics everything that is still relevant will be
//re-added.
this.dynamicMetrics.clear();
return toReturn != null;
}
/**
* @return A new Map of online regions sorted by region size with the first
* entry being the biggest.
@ -2491,7 +2489,7 @@ public class HRegionServer implements ClientProtocol,
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk) {
return closeRegion(region, abort, zk, -1);
return closeRegion(region, abort, zk, -1, null);
}
/**
@ -2506,7 +2504,7 @@ public class HRegionServer implements ClientProtocol,
* @return True if closed a region.
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk, final int versionOfClosingNode) {
final boolean zk, final int versionOfClosingNode, ServerName sn) {
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
@ -2521,8 +2519,7 @@ public class HRegionServer implements ClientProtocol,
crh = new CloseMetaHandler(this, this, region, abort, zk,
versionOfClosingNode);
} else {
crh = new CloseRegionHandler(this, this, region, abort, zk,
versionOfClosingNode);
crh = new CloseRegionHandler(this, this, region, abort, zk, versionOfClosingNode, sn);
}
this.service.submit(crh);
return true;
@ -2543,6 +2540,25 @@ public class HRegionServer implements ClientProtocol,
return this.onlineRegions.get(encodedRegionName);
}
@Override
public boolean removeFromOnlineRegions(final String encodedRegionName, ServerName destination) {
HRegion toReturn = this.onlineRegions.remove(encodedRegionName);
if (destination != null){
addToMovedRegions(encodedRegionName, destination);
}
//Clear all of the dynamic metrics as they are now probably useless.
//This is a clear because dynamic metrics could include metrics per cf and
//per hfile. Figuring out which cfs, hfiles, and regions are still relevant to
//this region server would be an onerous task. Instead just clear everything
//and on the next tick of the metrics everything that is still relevant will be
//re-added.
this.dynamicMetrics.clear();
return toReturn != null;
}
/**
* Protected utility method for safely obtaining an HRegion handle.
*
@ -2553,11 +2569,21 @@ public class HRegionServer implements ClientProtocol,
*/
protected HRegion getRegion(final byte[] regionName)
throws NotServingRegionException {
HRegion region = null;
region = getOnlineRegion(regionName);
String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
return getRegionByEncodedName(encodedRegionName);
}
protected HRegion getRegionByEncodedName(String encodedRegionName)
throws NotServingRegionException {
HRegion region = this.onlineRegions.get(encodedRegionName);
if (region == null) {
throw new NotServingRegionException("Region is not online: " +
Bytes.toStringBinary(regionName));
ServerName sn = getMovedRegion(encodedRegionName);
if (sn != null) {
throw new RegionMovedException(sn.getHostname(), sn.getPort());
} else {
throw new NotServingRegionException("Region is not online: " + encodedRegionName);
}
}
return region;
}
@ -3396,7 +3422,7 @@ public class HRegionServer implements ClientProtocol,
} else {
LOG.warn("The region " + region.getEncodedName()
+ " is online on this server but META does not have this server.");
removeFromOnlineRegions(region.getEncodedName());
removeFromOnlineRegions(region.getEncodedName(), null);
}
}
LOG.info("Received request to open region: "
@ -3438,6 +3464,9 @@ public class HRegionServer implements ClientProtocol,
versionOfClosingNode = request.getVersionOfClosingNode();
}
boolean zk = request.getTransitionInZK();
final ServerName sn = (request.hasDestinationServer() ?
ProtobufUtil.toServerName(request.getDestinationServer()) : null);
try {
checkOpen();
requestCount.incrementAndGet();
@ -3445,11 +3474,12 @@ public class HRegionServer implements ClientProtocol,
CloseRegionResponse.Builder
builder = CloseRegionResponse.newBuilder();
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode);
". Version of ZK closing node:" + versionOfClosingNode +
". Destination server:" + sn);
HRegionInfo regionInfo = region.getRegionInfo();
checkIfRegionInTransition(regionInfo, CLOSE);
boolean closed = closeRegion(
regionInfo, false, zk, versionOfClosingNode);
regionInfo, false, zk, versionOfClosingNode, sn);
builder.setClosed(closed);
return builder.build();
} catch (IOException ie) {
@ -3651,13 +3681,7 @@ public class HRegionServer implements ClientProtocol,
case REGION_NAME:
return getRegion(value);
case ENCODED_REGION_NAME:
String encodedRegionName = Bytes.toString(value);
HRegion region = this.onlineRegions.get(encodedRegionName);
if (region == null) {
throw new NotServingRegionException(
"Region is not online: " + encodedRegionName);
}
return region;
return getRegionByEncodedName(Bytes.toString(value));
default:
throw new DoNotRetryIOException(
"Unsupported region specifier type: " + type);
@ -3793,4 +3817,95 @@ public class HRegionServer implements ClientProtocol,
}
region.mutateRow(rm);
}
// This map will containsall the regions that we closed for a move.
// We add the time it was moved as we don't want to keep too old information
protected Map<String, Pair<Long, ServerName>> movedRegions =
new ConcurrentHashMap<String, Pair<Long, ServerName>>(3000);
// We need a timeout. If not there is a risk of giving a wrong information: this would double
// the number of network calls instead of reducing them.
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
protected void addToMovedRegions(HRegionInfo hri, ServerName destination){
addToMovedRegions(hri.getEncodedName(), destination);
}
protected void addToMovedRegions(String encodedName, ServerName destination){
final Long time = System.currentTimeMillis();
movedRegions.put(
encodedName,
new Pair<Long, ServerName>(time, destination));
}
private ServerName getMovedRegion(final String encodedRegionName) {
Pair<Long, ServerName> dest = movedRegions.get(encodedRegionName);
if (dest != null) {
if (dest.getFirst() > (System.currentTimeMillis() - TIMEOUT_REGION_MOVED)) {
return dest.getSecond();
} else {
movedRegions.remove(encodedRegionName);
}
}
return null;
}
/**
* Remove the expired entries from the moved regions list.
*/
protected void cleanMovedRegions(){
final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
Iterator<Entry<String, Pair<Long, ServerName>>> it = movedRegions.entrySet().iterator();
while (it.hasNext()){
Map.Entry<String, Pair<Long, ServerName>> e = it.next();
if (e.getValue().getFirst() < cutOff){
it.remove();
}
}
}
/**
* Creates a Chore thread to clean the moved region cache.
*/
protected static class MovedRegionsCleaner extends Chore implements Stoppable {
private HRegionServer regionServer;
Stoppable stoppable;
private MovedRegionsCleaner(
HRegionServer regionServer, Stoppable stoppable){
super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
this.regionServer = regionServer;
this.stoppable = stoppable;
}
static MovedRegionsCleaner createAndStart(HRegionServer rs){
Stoppable stoppable = new Stoppable() {
private volatile boolean isStopped = false;
@Override public void stop(String why) { isStopped = true;}
@Override public boolean isStopped() {return isStopped;}
};
return new MovedRegionsCleaner(rs, stoppable);
}
@Override
protected void chore() {
regionServer.cleanMovedRegions();
}
@Override
public void stop(String why) {
stoppable.stop(why);
}
@Override
public boolean isStopped() {
return stoppable.isStopped();
}
}
}

View File

@ -23,7 +23,9 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
/**
* Interface to Map of online regions. In the Map, the key is the region's
@ -41,9 +43,10 @@ interface OnlineRegions extends Server {
* This method removes HRegion corresponding to hri from the Map of onlineRegions.
*
* @param encodedRegionName
* @param destination - destination, if any. Null otherwise
* @return True if we removed a region from online list.
*/
public boolean removeFromOnlineRegions(String encodedRegionName);
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination);
/**
* Return {@link HRegion} instance.

View File

@ -274,7 +274,7 @@ public class SplitTransaction {
}
if (!testing) {
services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName(), null);
}
this.journal.add(JournalEntry.OFFLINED_PARENT);

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -60,11 +61,12 @@ public class CloseRegionHandler extends EventHandler {
// close -- not the master process so state up in zk will unlikely be
// CLOSING.
private final boolean zk;
private ServerName destination;
// This is executed after receiving an CLOSE RPC from the master.
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo) {
this(server, rsServices, regionInfo, false, true, -1);
this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null);
}
/**
@ -80,13 +82,28 @@ public class CloseRegionHandler extends EventHandler {
final HRegionInfo regionInfo, final boolean abort, final boolean zk,
final int versionOfClosingNode) {
this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
EventType.M_RS_CLOSE_REGION);
EventType.M_RS_CLOSE_REGION, null);
}
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices,
final HRegionInfo regionInfo, final boolean abort, final boolean zk,
final int versionOfClosingNode, ServerName destination) {
this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
EventType.M_RS_CLOSE_REGION, destination);
}
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
boolean abort, final boolean zk, final int versionOfClosingNode,
EventType eventType) {
this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null);
}
protected CloseRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
boolean abort, final boolean zk, final int versionOfClosingNode,
EventType eventType) {
EventType eventType, ServerName destination) {
super(server, eventType);
this.server = server;
this.rsServices = rsServices;
@ -94,6 +111,7 @@ public class CloseRegionHandler extends EventHandler {
this.abort = abort;
this.zk = zk;
this.expectedVersion = versionOfClosingNode;
this.destination = destination;
}
public HRegionInfo getRegionInfo() {
@ -135,7 +153,7 @@ public class CloseRegionHandler extends EventHandler {
throw new RuntimeException(t);
}
this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName());
this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName(), destination);
if (this.zk) {
if (setClosedState(this.expectedVersion, region)) {
@ -183,5 +201,4 @@ public class CloseRegionHandler extends EventHandler {
}
return true;
}
}

View File

@ -77,6 +77,7 @@ message CloseRegionRequest {
required RegionSpecifier region = 1;
optional uint32 versionOfClosingNode = 2;
optional bool transitionInZK = 3 [default = true];
optional ServerName destinationServer = 4;
}
message CloseRegionResponse {

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@ -36,6 +38,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.rest.protobuf.generated.ScannerMessage;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
@ -57,7 +61,7 @@ public class TestHCM {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.startMiniCluster(2);
}
@AfterClass public static void tearDownAfterClass() throws Exception {
@ -121,7 +125,7 @@ public class TestHCM {
* that we really delete it.
* @throws Exception
*/
@Test
@Test(timeout = 60000)
public void testRegionCaching() throws Exception{
HTable table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAM);
TEST_UTIL.createMultiRegions(table, FAM_NAM);
@ -130,10 +134,142 @@ public class TestHCM {
table.put(put);
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)table.getConnection();
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
conn.deleteCachedLocation(TABLE_NAME, ROW);
assertNotNull(conn.getCachedLocation(TABLE_NAME.clone(), ROW.clone()));
assertNotNull(conn.getCachedLocation(
Bytes.toString(TABLE_NAME).getBytes() , Bytes.toString(ROW).getBytes()));
final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
conn.updateCachedLocation(conn.getCachedLocation(TABLE_NAME, ROW), "127.0.0.1", nextPort);
Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
conn.deleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
assertNull("What is this location?? " + rl, rl);
// We're now going to move the region and check that it works for the client
// First a new put to add the location in the cache
conn.clearRegionCache(TABLE_NAME);
Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
Put put2 = new Put(ROW);
put2.add(FAM_NAM, ROW, ROW);
table.put(put2);
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
// We can wait for all regions to be onlines, that makes log reading easier when debugging
while (!TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionsInTransition().isEmpty()) {
}
// Now moving the region to the second server
TEST_UTIL.getHBaseAdmin().balanceSwitch(false);
HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW);
byte[] regionName = toMove.getRegionInfo().getRegionName();
// Choose the other server.
int curServerId = TEST_UTIL.getHBaseCluster().getServerWith( regionName );
int destServerId = (curServerId == 0 ? 1 : 0);
HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
ServerName destServerName = destServer.getServerName();
// Check that we are in the expected state
Assert.assertTrue(curServer != destServer);
Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
Assert.assertFalse( toMove.getPort() == destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
// Moving. It's possible that we don't have all the regions online at this point, so
// the test must depends only on the region we're looking at.
LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
TEST_UTIL.getHBaseAdmin().move(
toMove.getRegionInfo().getEncodedNameAsBytes(),
destServerName.getServerName().getBytes()
);
while ( destServer.getOnlineRegion(regionName) == null ){
// wait for the move to be finished
}
// Check our new state.
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName));
LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
// Cache was NOT updated and points to the wrong server
Assert.assertFalse( conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
// Hijack the number of retry to fail immediately instead of retrying: there will be no new
// connection to the master
Field numRetries = conn.getClass().getDeclaredField("numRetries");
numRetries.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
final int prevNumRetriesVal = (Integer)numRetries.get(conn);
numRetries.set(conn, 1);
// We do a put and expect the cache to be updated, even if we don't retry
LOG.info("Put starting");
Put put3 = new Put(ROW);
put3.add(FAM_NAM, ROW, ROW);
try {
table.put(put3);
Assert.assertFalse("Unreachable point", true);
}catch (Throwable e){
LOG.info("Put done, expected exception caught: "+e.getClass());
}
Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
Assert.assertEquals(
"Previous server was "+curServer.getServerName().getHostAndPort(),
destServerName.getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
// We move it back to do another test with a scan
LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
TEST_UTIL.getHBaseAdmin().move(
toMove.getRegionInfo().getEncodedNameAsBytes(),
curServer.getServerName().getServerName().getBytes()
);
while ( curServer.getOnlineRegion(regionName) == null ){
// wait for the move to be finished
}
// Check our new state.
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
// Cache was NOT updated and points to the wrong server
Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getPort() ==
curServer.getServerName().getPort());
Scan sc = new Scan();
sc.setStopRow(ROW);
sc.setStopRow(ROW);
try {
ResultScanner rs = table.getScanner(sc);
while (rs.next() != null){}
Assert.assertFalse("Unreachable point", true);
}catch (Throwable e){
LOG.info("Put done, expected exception caught: "+e.getClass());
}
// Cache is updated with the right value.
Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
Assert.assertEquals(
"Previous server was "+destServer.getServerName().getHostAndPort(),
curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
numRetries.set(conn, prevNumRetriesVal);
table.close();
}

View File

@ -250,7 +250,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
}
@Override
public boolean removeFromOnlineRegions(String encodedRegionName) {
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) {
// TODO Auto-generated method stub
return false;
}

View File

@ -50,7 +50,7 @@ public class MockRegionServerServices implements RegionServerServices {
private HFileSystem hfs = null;
@Override
public boolean removeFromOnlineRegions(String encodedRegionName) {
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) {
return this.regions.remove(encodedRegionName) != null;
}