diff --git a/src/main/java/org/apache/hadoop/hbase/RegionMovedException.java b/src/main/java/org/apache/hadoop/hbase/RegionMovedException.java new file mode 100644 index 00000000000..cbdf570bb3b --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/RegionMovedException.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Subclass if the server knows the region is now on another server. + * This allows the client to call the new region server without calling the master. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RegionMovedException extends NotServingRegionException { + private static final Log LOG = LogFactory.getLog(RegionMovedException.class); + private static final long serialVersionUID = -7232903522310558397L; + + private final String hostname; + private final int port; + + private static final String HOST_FIELD = "hostname="; + private static final String PORT_FIELD = "port="; + + public RegionMovedException(final String hostname, final int port) { + super(); + this.hostname = hostname; + this.port = port; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + /** + * For hadoop.ipc internal call. Do NOT use. + * We have to parse the hostname to recreate the exception. + * The input is the one generated by {@link #getMessage()} + */ + public RegionMovedException(String s) { + int posHostname = s.indexOf(HOST_FIELD) + HOST_FIELD.length(); + int posPort = s.indexOf(PORT_FIELD) + PORT_FIELD.length(); + + String tmpHostname = null; + int tmpPort = -1; + try { + tmpHostname = s.substring(posHostname, s.indexOf(' ', posHostname)); + tmpPort = Integer.parseInt(s.substring(posPort)); + } catch (Exception ignored) { + LOG.warn("Can't parse the hostname and the port from this string: " + s + ", "+ + "Continuing"); + } + + hostname = tmpHostname; + port = tmpPort; + } + + @Override + public String getMessage() { + return "Region moved to: " + HOST_FIELD + hostname + " " + PORT_FIELD + port; + } + + /** + * Look for a RegionMovedException in the exception: + * - hadoop.ipc wrapped exceptions + * - nested exceptions + * Returns null if we didn't find the exception or if it was not readable. + */ + public static RegionMovedException find(Object exception) { + if (exception == null || !(exception instanceof Throwable)){ + return null; + } + + Throwable cur = (Throwable)exception; + RegionMovedException res = null; + + while (res == null && cur != null) { + if (cur instanceof RegionMovedException) { + res = (RegionMovedException) cur; + } else { + if (cur instanceof RemoteException) { + RemoteException re = (RemoteException) cur; + Exception e = re.unwrapRemoteException(RegionMovedException.class); + if (e == null){ + e = re.unwrapRemoteException(); + } + // unwrapRemoteException can return the exception given as a parameter when it cannot + // unwrap it. In this case, there is no need to look further + // noinspection ObjectEquality + if (e != re){ + res = find(e); + } + } + cur = cur.getCause(); + } + } + + if (res != null && (res.getPort() < 0 || res.getHostname() == null)){ + // We failed to parse the exception. Let's act as we don't find the exception. + return null; + } else { + return res; + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 11d8bf912a2..5cac9af9dfb 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.RegionMovedException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; @@ -1716,6 +1717,81 @@ public class HConnectionManager { }; } + void updateCachedLocation(HRegionLocation hrl, String hostname, int port) { + HRegionLocation newHrl = new HRegionLocation(hrl.getRegionInfo(), hostname, port); + synchronized (this.cachedRegionLocations) { + cacheLocation(hrl.getRegionInfo().getTableName(), newHrl); + } + } + + void deleteCachedLocation(HRegionLocation rl) { + synchronized (this.cachedRegionLocations) { + Map tableLocations = + getTableLocations(rl.getRegionInfo().getTableName()); + tableLocations.remove(rl.getRegionInfo().getStartKey()); + } + } + + private void updateCachedLocations( + UpdateHistory updateHistory, HRegionLocation hrl, Object t) { + updateCachedLocations(updateHistory, hrl, null, null, t); + } + + private void updateCachedLocations(UpdateHistory updateHistory, byte[] tableName, Row row, + Object t) { + updateCachedLocations(updateHistory, null, tableName, row, t); + } + + /** + * Update the location with the new value (if the exception is a RegionMovedException) or delete + * it from the cache. + * We need to keep an history of the modifications, because we can have first an update then a + * delete. The delete would remove the update. + * @param updateHistory - The set used for the history + * @param hrl - can be null. If it's the case, tableName and row should not be null + * @param tableName - can be null if hrl is not null. + * @param row - can be null if hrl is not null. + * @param exception - An object (to simplify user code) on which we will try to find a nested + * or wrapped or both RegionMovedException + */ + private void updateCachedLocations( + UpdateHistory updateHistory, final HRegionLocation hrl, final byte[] tableName, + Row row, final Object exception) { + + if ((row == null || tableName == null) && hrl == null){ + LOG.warn ("Coding error, see method javadoc. row="+row+", tableName="+ + Bytes.toString(tableName)+", hrl="+hrl); + return; + } + + // Is it something we have already updated? + final HRegionLocation myLoc = (hrl != null ? + hrl : getCachedLocation(tableName, row.getRow())); + if (myLoc == null) { + // There is no such location in the cache => it's been removed already => nothing to do + return; + } + + final String regionName = myLoc.getRegionInfo().getRegionNameAsString(); + + if (updateHistory.contains(regionName)) { + // Already updated/deleted => nothing to do + return; + } + + updateHistory.add(regionName); + + final RegionMovedException rme = RegionMovedException.find(exception); + if (rme != null) { + LOG.info("Region " + myLoc.getRegionInfo().getRegionNameAsString() + " moved from " + + myLoc.getHostnamePort() + ", updating client location cache." + + " New server: " + rme.getHostname() + ":" + rme.getPort()); + updateCachedLocation(myLoc, rme.getHostname(), rme.getPort()); + } else { + deleteCachedLocation(myLoc); + } + } + @Override @Deprecated public void processBatch(List list, @@ -1799,6 +1875,19 @@ public class HConnectionManager { } } + private static class UpdateHistory{ + private final Set updateHistory = new HashSet(100); // size: if we're doing a + // rolling restart we may have 100 regions with a wrong location if we're really unlucky + + public boolean contains(String regionName) { + return updateHistory.contains(regionName); + } + + public void add(String regionName) { + updateHistory.add(regionName); + } + } + /** * Parameterized batch processing, allowing varying return types for * different {@link Row} implementations. @@ -1833,6 +1922,7 @@ public class HConnectionManager { int actionCount = 0; for (int tries = 0; tries < numRetries && retry; ++tries) { + UpdateHistory updateHistory = new UpdateHistory(); // sleep first, if this is a retry if (tries >= 1) { @@ -1910,6 +2000,7 @@ public class HConnectionManager { } } catch (ExecutionException e) { LOG.debug("Failed all from " + loc, e); + updateCachedLocations(updateHistory, loc, e); } } @@ -1931,7 +2022,7 @@ public class HConnectionManager { actionCount++; Row row = list.get(i); workingList.add(row); - deleteCachedLocation(tableName, row.getRow()); + updateCachedLocations(updateHistory, tableName, row, results[i]); } else { if (results[i] != null && results[i] instanceof Throwable) { actionCount++; diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 16dc6f4a916..6609f337220 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -1943,7 +1943,7 @@ public class AssignmentManager extends ZooKeeperListener { * @param region server to be unassigned * @param force if region should be closed even if already closing */ - public void unassign(HRegionInfo region, boolean force) { + public void unassign(HRegionInfo region, boolean force, ServerName dest) { // TODO: Method needs refactoring. Ugly buried returns throughout. Beware! LOG.debug("Starting unassignment of region " + region.getRegionNameAsString() + " (offlining)"); @@ -2045,7 +2045,7 @@ public class AssignmentManager extends ZooKeeperListener { // TODO: We should consider making this look more like it does for the // region open where we catch all throwables and never abort if (serverManager.sendRegionClose(server, state.getRegion(), - versionOfClosingNode)) { + versionOfClosingNode, dest)) { LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString()); return; @@ -2086,10 +2086,14 @@ public class AssignmentManager extends ZooKeeperListener { state.update(state.getState()); } LOG.info("Server " + server + " returned " + t + " for " + - region.getRegionNameAsString()); + region.getRegionNameAsString(), t); // Presume retry or server will expire. } } + + public void unassign(HRegionInfo region, boolean force){ + unassign(region, force, null); + } private void deleteClosingOrClosedNode(HRegionInfo region) { try { @@ -3228,7 +3232,7 @@ public class AssignmentManager extends ZooKeeperListener { synchronized (this.regionPlans) { this.regionPlans.put(plan.getRegionName(), plan); } - unassign(plan.getRegionInfo()); + unassign(plan.getRegionInfo(), false, plan.getDestination()); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 12c8f5801a2..29e1066bb29 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -528,11 +528,12 @@ public class ServerManager { * @param versionOfClosingNode * the version of znode to compare when RS transitions the znode from * CLOSING state. + * @param dest - if the region is moved to another server, the destination server. null otherwise. * @return true if server acknowledged close, false if not * @throws IOException */ public boolean sendRegionClose(ServerName server, HRegionInfo region, - int versionOfClosingNode) throws IOException { + int versionOfClosingNode, ServerName dest) throws IOException { if (server == null) throw new NullPointerException("Passed server is null"); AdminProtocol admin = getServerConnection(server); if (admin == null) { @@ -542,16 +543,21 @@ public class ServerManager { " failed because no RPC connection found to this server"); } return ProtobufUtil.closeRegion(admin, region.getRegionName(), - versionOfClosingNode); + versionOfClosingNode, dest); } - /** - * @param sn - * @return - * @throws IOException - * @throws RetriesExhaustedException wrapping a ConnectException if failed - * putting up proxy. - */ + public boolean sendRegionClose(ServerName server, HRegionInfo region, + int versionOfClosingNode) throws IOException { + return sendRegionClose(server, region, versionOfClosingNode, null); + } + + /** + * @param sn + * @return + * @throws IOException + * @throws RetriesExhaustedException wrapping a ConnectException if failed + * putting up proxy. + */ private AdminProtocol getServerConnection(final ServerName sn) throws IOException { AdminProtocol admin = this.serverConnections.get(sn.toString()); diff --git a/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 21e4211d650..ccc964e52b6 100644 --- a/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1282,6 +1282,29 @@ public final class ProtobufUtil { } } + /** + * A helper to close a region given a region name + * using admin protocol. + * + * @param admin + * @param regionName + * @param versionOfClosingNode + * @return true if the region is closed + * @throws IOException + */ + public static boolean closeRegion(final AdminProtocol admin, final byte[] regionName, + final int versionOfClosingNode, final ServerName destinationServer) throws IOException { + CloseRegionRequest closeRegionRequest = + RequestConverter.buildCloseRegionRequest(regionName, versionOfClosingNode, destinationServer); + try { + CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest); + return ResponseConverter.isClosed(response); + } catch (ServiceException se) { + throw getRemoteException(se); + } + } + + /** * A helper to open a region using admin protocol. * diff --git a/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index a5e03e1ec05..dabfbab3355 100644 --- a/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -643,6 +644,19 @@ public final class RequestConverter { return builder.build(); } + public static CloseRegionRequest buildCloseRegionRequest( + final byte[] regionName, final int versionOfClosingNode, ServerName destinationServer) { + CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder(); + RegionSpecifier region = buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName); + builder.setRegion(region); + builder.setVersionOfClosingNode(versionOfClosingNode); + if (destinationServer != null){ + builder.setDestinationServer(ProtobufUtil.toServerName( destinationServer) ); + } + return builder.build(); + } + /** * Create a CloseRegionRequest for a given encoded region name * diff --git a/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index 7664650313c..ca0d5db1c10 100644 --- a/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -4031,6 +4031,11 @@ public final class AdminProtos { // optional bool transitionInZK = 3 [default = true]; boolean hasTransitionInZK(); boolean getTransitionInZK(); + + // optional .ServerName destinationServer = 4; + boolean hasDestinationServer(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getDestinationServer(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getDestinationServerOrBuilder(); } public static final class CloseRegionRequest extends com.google.protobuf.GeneratedMessage @@ -4094,10 +4099,24 @@ public final class AdminProtos { return transitionInZK_; } + // optional .ServerName destinationServer = 4; + public static final int DESTINATIONSERVER_FIELD_NUMBER = 4; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName destinationServer_; + public boolean hasDestinationServer() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getDestinationServer() { + return destinationServer_; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getDestinationServerOrBuilder() { + return destinationServer_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); versionOfClosingNode_ = 0; transitionInZK_ = true; + destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4112,6 +4131,12 @@ public final class AdminProtos { memoizedIsInitialized = 0; return false; } + if (hasDestinationServer()) { + if (!getDestinationServer().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -4128,6 +4153,9 @@ public final class AdminProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, transitionInZK_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeMessage(4, destinationServer_); + } getUnknownFields().writeTo(output); } @@ -4149,6 +4177,10 @@ public final class AdminProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, transitionInZK_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(4, destinationServer_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4187,6 +4219,11 @@ public final class AdminProtos { result = result && (getTransitionInZK() == other.getTransitionInZK()); } + result = result && (hasDestinationServer() == other.hasDestinationServer()); + if (hasDestinationServer()) { + result = result && getDestinationServer() + .equals(other.getDestinationServer()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4208,6 +4245,10 @@ public final class AdminProtos { hash = (37 * hash) + TRANSITIONINZK_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getTransitionInZK()); } + if (hasDestinationServer()) { + hash = (37 * hash) + DESTINATIONSERVER_FIELD_NUMBER; + hash = (53 * hash) + getDestinationServer().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -4317,6 +4358,7 @@ public final class AdminProtos { private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getRegionFieldBuilder(); + getDestinationServerFieldBuilder(); } } private static Builder create() { @@ -4335,6 +4377,12 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); transitionInZK_ = true; bitField0_ = (bitField0_ & ~0x00000004); + if (destinationServerBuilder_ == null) { + destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + } else { + destinationServerBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -4389,6 +4437,14 @@ public final class AdminProtos { to_bitField0_ |= 0x00000004; } result.transitionInZK_ = transitionInZK_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + if (destinationServerBuilder_ == null) { + result.destinationServer_ = destinationServer_; + } else { + result.destinationServer_ = destinationServerBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4414,6 +4470,9 @@ public final class AdminProtos { if (other.hasTransitionInZK()) { setTransitionInZK(other.getTransitionInZK()); } + if (other.hasDestinationServer()) { + mergeDestinationServer(other.getDestinationServer()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4427,6 +4486,12 @@ public final class AdminProtos { return false; } + if (hasDestinationServer()) { + if (!getDestinationServer().isInitialized()) { + + return false; + } + } return true; } @@ -4472,6 +4537,15 @@ public final class AdminProtos { transitionInZK_ = input.readBool(); break; } + case 34: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + if (hasDestinationServer()) { + subBuilder.mergeFrom(getDestinationServer()); + } + input.readMessage(subBuilder, extensionRegistry); + setDestinationServer(subBuilder.buildPartial()); + break; + } } } } @@ -4610,6 +4684,96 @@ public final class AdminProtos { return this; } + // optional .ServerName destinationServer = 4; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> destinationServerBuilder_; + public boolean hasDestinationServer() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getDestinationServer() { + if (destinationServerBuilder_ == null) { + return destinationServer_; + } else { + return destinationServerBuilder_.getMessage(); + } + } + public Builder setDestinationServer(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (destinationServerBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + destinationServer_ = value; + onChanged(); + } else { + destinationServerBuilder_.setMessage(value); + } + bitField0_ |= 0x00000008; + return this; + } + public Builder setDestinationServer( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) { + if (destinationServerBuilder_ == null) { + destinationServer_ = builderForValue.build(); + onChanged(); + } else { + destinationServerBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000008; + return this; + } + public Builder mergeDestinationServer(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (destinationServerBuilder_ == null) { + if (((bitField0_ & 0x00000008) == 0x00000008) && + destinationServer_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) { + destinationServer_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(destinationServer_).mergeFrom(value).buildPartial(); + } else { + destinationServer_ = value; + } + onChanged(); + } else { + destinationServerBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000008; + return this; + } + public Builder clearDestinationServer() { + if (destinationServerBuilder_ == null) { + destinationServer_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + onChanged(); + } else { + destinationServerBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000008); + return this; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getDestinationServerBuilder() { + bitField0_ |= 0x00000008; + onChanged(); + return getDestinationServerFieldBuilder().getBuilder(); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getDestinationServerOrBuilder() { + if (destinationServerBuilder_ != null) { + return destinationServerBuilder_.getMessageOrBuilder(); + } else { + return destinationServer_; + } + } + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> + getDestinationServerFieldBuilder() { + if (destinationServerBuilder_ == null) { + destinationServerBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>( + destinationServer_, + getParentForChildren(), + isClean()); + destinationServer_ = null; + } + return destinationServerBuilder_; + } + // @@protoc_insertion_point(builder_scope:CloseRegionRequest) } @@ -15511,62 +15675,63 @@ public final class AdminProtos { "esponse\022<\n\014openingState\030\001 \003(\0162&.OpenRegi" + "onResponse.RegionOpeningState\"H\n\022RegionO" + "peningState\022\n\n\006OPENED\020\000\022\022\n\016ALREADY_OPENE" + - "D\020\001\022\022\n\016FAILED_OPENING\020\002\"r\n\022CloseRegionRe" + - "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + - "\034\n\024versionOfClosingNode\030\002 \001(\r\022\034\n\016transit" + - "ionInZK\030\003 \001(\010:\004true\"%\n\023CloseRegionRespon" + - "se\022\016\n\006closed\030\001 \002(\010\"M\n\022FlushRegionRequest" + - "\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\025\n\rif", - "OlderThanTs\030\002 \001(\004\"=\n\023FlushRegionResponse" + - "\022\025\n\rlastFlushTime\030\001 \002(\004\022\017\n\007flushed\030\002 \001(\010" + - "\"J\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132\020" + - ".RegionSpecifier\022\022\n\nsplitPoint\030\002 \001(\014\"\025\n\023" + - "SplitRegionResponse\"G\n\024CompactRegionRequ" + - "est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r\n" + - "\005major\030\002 \001(\010\"\027\n\025CompactRegionResponse\"1\n" + - "\004UUID\022\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013mostSigBi" + - "ts\030\002 \002(\004\"\270\003\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.WAL" + - "Entry.WALKey\022\037\n\004edit\030\002 \002(\0132\021.WALEntry.WA", - "LEdit\032~\n\006WALKey\022\031\n\021encodedRegionName\030\001 \002" + - "(\014\022\021\n\ttableName\030\002 \002(\014\022\031\n\021logSequenceNumb" + - "er\030\003 \002(\004\022\021\n\twriteTime\030\004 \002(\004\022\030\n\tclusterId" + - "\030\005 \001(\0132\005.UUID\032\353\001\n\007WALEdit\022\025\n\rkeyValueByt" + - "es\030\001 \003(\014\0222\n\013familyScope\030\002 \003(\0132\035.WALEntry" + - ".WALEdit.FamilyScope\032M\n\013FamilyScope\022\016\n\006f" + - "amily\030\001 \002(\014\022.\n\tscopeType\030\002 \002(\0162\033.WALEntr" + - "y.WALEdit.ScopeType\"F\n\tScopeType\022\033\n\027REPL" + - "ICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCO" + - "PE_GLOBAL\020\001\"4\n\030ReplicateWALEntryRequest\022", - "\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateWA" + - "LEntryResponse\"\026\n\024RollWALWriterRequest\"." + - "\n\025RollWALWriterResponse\022\025\n\rregionToFlush" + - "\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001 " + - "\002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerIn" + - "foRequest\"@\n\nServerInfo\022\037\n\nserverName\030\001 " + - "\002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(\r\"8\n\025G" + - "etServerInfoResponse\022\037\n\nserverInfo\030\001 \002(\013" + - "2\013.ServerInfo2\371\005\n\014AdminService\022>\n\rgetReg" + - "ionInfo\022\025.GetRegionInfoRequest\032\026.GetRegi", - "onInfoResponse\022;\n\014getStoreFile\022\024.GetStor" + - "eFileRequest\032\025.GetStoreFileResponse\022D\n\017g" + - "etOnlineRegion\022\027.GetOnlineRegionRequest\032" + - "\030.GetOnlineRegionResponse\0225\n\nopenRegion\022" + - "\022.OpenRegionRequest\032\023.OpenRegionResponse" + - "\0228\n\013closeRegion\022\023.CloseRegionRequest\032\024.C" + - "loseRegionResponse\0228\n\013flushRegion\022\023.Flus" + - "hRegionRequest\032\024.FlushRegionResponse\0228\n\013" + - "splitRegion\022\023.SplitRegionRequest\032\024.Split" + - "RegionResponse\022>\n\rcompactRegion\022\025.Compac", - "tRegionRequest\032\026.CompactRegionResponse\022J" + - "\n\021replicateWALEntry\022\031.ReplicateWALEntryR" + - "equest\032\032.ReplicateWALEntryResponse\022>\n\rro" + - "llWALWriter\022\025.RollWALWriterRequest\032\026.Rol" + - "lWALWriterResponse\022>\n\rgetServerInfo\022\025.Ge" + - "tServerInfoRequest\032\026.GetServerInfoRespon" + - "se\0225\n\nstopServer\022\022.StopServerRequest\032\023.S" + - "topServerResponseBA\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\013AdminProtosH\001\210\001" + - "\001\240\001\001" + "D\020\001\022\022\n\016FAILED_OPENING\020\002\"\232\001\n\022CloseRegionR" + + "equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" + + "\022\034\n\024versionOfClosingNode\030\002 \001(\r\022\034\n\016transi" + + "tionInZK\030\003 \001(\010:\004true\022&\n\021destinationServe" + + "r\030\004 \001(\0132\013.ServerName\"%\n\023CloseRegionRespo" + + "nse\022\016\n\006closed\030\001 \002(\010\"M\n\022FlushRegionReques", + "t\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\025\n\ri" + + "fOlderThanTs\030\002 \001(\004\"=\n\023FlushRegionRespons" + + "e\022\025\n\rlastFlushTime\030\001 \002(\004\022\017\n\007flushed\030\002 \001(" + + "\010\"J\n\022SplitRegionRequest\022 \n\006region\030\001 \002(\0132" + + "\020.RegionSpecifier\022\022\n\nsplitPoint\030\002 \001(\014\"\025\n" + + "\023SplitRegionResponse\"G\n\024CompactRegionReq" + + "uest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\r" + + "\n\005major\030\002 \001(\010\"\027\n\025CompactRegionResponse\"1" + + "\n\004UUID\022\024\n\014leastSigBits\030\001 \002(\004\022\023\n\013mostSigB" + + "its\030\002 \002(\004\"\270\003\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.WA", + "LEntry.WALKey\022\037\n\004edit\030\002 \002(\0132\021.WALEntry.W" + + "ALEdit\032~\n\006WALKey\022\031\n\021encodedRegionName\030\001 " + + "\002(\014\022\021\n\ttableName\030\002 \002(\014\022\031\n\021logSequenceNum" + + "ber\030\003 \002(\004\022\021\n\twriteTime\030\004 \002(\004\022\030\n\tclusterI" + + "d\030\005 \001(\0132\005.UUID\032\353\001\n\007WALEdit\022\025\n\rkeyValueBy" + + "tes\030\001 \003(\014\0222\n\013familyScope\030\002 \003(\0132\035.WALEntr" + + "y.WALEdit.FamilyScope\032M\n\013FamilyScope\022\016\n\006" + + "family\030\001 \002(\014\022.\n\tscopeType\030\002 \002(\0162\033.WALEnt" + + "ry.WALEdit.ScopeType\"F\n\tScopeType\022\033\n\027REP" + + "LICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SC", + "OPE_GLOBAL\020\001\"4\n\030ReplicateWALEntryRequest" + + "\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031ReplicateW" + + "ALEntryResponse\"\026\n\024RollWALWriterRequest\"" + + ".\n\025RollWALWriterResponse\022\025\n\rregionToFlus" + + "h\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006reason\030\001" + + " \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetServerI" + + "nfoRequest\"@\n\nServerInfo\022\037\n\nserverName\030\001" + + " \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(\r\"8\n\025" + + "GetServerInfoResponse\022\037\n\nserverInfo\030\001 \002(" + + "\0132\013.ServerInfo2\371\005\n\014AdminService\022>\n\rgetRe", + "gionInfo\022\025.GetRegionInfoRequest\032\026.GetReg" + + "ionInfoResponse\022;\n\014getStoreFile\022\024.GetSto" + + "reFileRequest\032\025.GetStoreFileResponse\022D\n\017" + + "getOnlineRegion\022\027.GetOnlineRegionRequest" + + "\032\030.GetOnlineRegionResponse\0225\n\nopenRegion" + + "\022\022.OpenRegionRequest\032\023.OpenRegionRespons" + + "e\0228\n\013closeRegion\022\023.CloseRegionRequest\032\024." + + "CloseRegionResponse\0228\n\013flushRegion\022\023.Flu" + + "shRegionRequest\032\024.FlushRegionResponse\0228\n" + + "\013splitRegion\022\023.SplitRegionRequest\032\024.Spli", + "tRegionResponse\022>\n\rcompactRegion\022\025.Compa" + + "ctRegionRequest\032\026.CompactRegionResponse\022" + + "J\n\021replicateWALEntry\022\031.ReplicateWALEntry" + + "Request\032\032.ReplicateWALEntryResponse\022>\n\rr" + + "ollWALWriter\022\025.RollWALWriterRequest\032\026.Ro" + + "llWALWriterResponse\022>\n\rgetServerInfo\022\025.G" + + "etServerInfoRequest\032\026.GetServerInfoRespo" + + "nse\0225\n\nstopServer\022\022.StopServerRequest\032\023." + + "StopServerResponseBA\n*org.apache.hadoop." + + "hbase.protobuf.generatedB\013AdminProtosH\001\210", + "\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15642,7 +15807,7 @@ public final class AdminProtos { internal_static_CloseRegionRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CloseRegionRequest_descriptor, - new java.lang.String[] { "Region", "VersionOfClosingNode", "TransitionInZK", }, + new java.lang.String[] { "Region", "VersionOfClosingNode", "TransitionInZK", "DestinationServer", }, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest.class, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest.Builder.class); internal_static_CloseRegionResponse_descriptor = diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7eceac15b1e..f7ac81a5a3f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionMovedException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; @@ -230,7 +232,7 @@ import com.google.protobuf.RpcController; */ @InterfaceAudience.Private @SuppressWarnings("deprecation") -public class HRegionServer implements ClientProtocol, +public class HRegionServer implements ClientProtocol, AdminProtocol, Runnable, RegionServerServices, HBaseRPCErrorHandler { public static final Log LOG = LogFactory.getLog(HRegionServer.class); @@ -416,6 +418,12 @@ public class HRegionServer implements ClientProtocol, */ private ObjectName mxBean = null; + /** + * Chore to clean periodically the moved region list + */ + private MovedRegionsCleaner movedRegionsCleaner; + + /** * Starts a HRegionServer at the default location * @@ -709,6 +717,9 @@ public class HRegionServer implements ClientProtocol, thriftServer.start(); LOG.info("Started Thrift API from Region Server."); } + + // Create the thread to clean the moved regions list + movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); } /** @@ -805,6 +816,8 @@ public class HRegionServer implements ClientProtocol, cacheConfig.getBlockCache().shutdown(); } + movedRegionsCleaner.stop("Region Server stopping"); + // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); @@ -2048,21 +2061,6 @@ public class HRegionServer implements ClientProtocol, this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); } - @Override - public boolean removeFromOnlineRegions(final String encodedName) { - HRegion toReturn = null; - toReturn = this.onlineRegions.remove(encodedName); - - //Clear all of the dynamic metrics as they are now probably useless. - //This is a clear because dynamic metrics could include metrics per cf and - //per hfile. Figuring out which cfs, hfiles, and regions are still relevant to - //this region server would be an onerous task. Instead just clear everything - //and on the next tick of the metrics everything that is still relevant will be - //re-added. - this.dynamicMetrics.clear(); - return toReturn != null; - } - /** * @return A new Map of online regions sorted by region size with the first * entry being the biggest. @@ -2491,7 +2489,7 @@ public class HRegionServer implements ClientProtocol, */ protected boolean closeRegion(HRegionInfo region, final boolean abort, final boolean zk) { - return closeRegion(region, abort, zk, -1); + return closeRegion(region, abort, zk, -1, null); } /** @@ -2506,7 +2504,7 @@ public class HRegionServer implements ClientProtocol, * @return True if closed a region. */ protected boolean closeRegion(HRegionInfo region, final boolean abort, - final boolean zk, final int versionOfClosingNode) { + final boolean zk, final int versionOfClosingNode, ServerName sn) { if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) { LOG.warn("Received close for region we are already opening or closing; " + region.getEncodedName()); @@ -2521,8 +2519,7 @@ public class HRegionServer implements ClientProtocol, crh = new CloseMetaHandler(this, this, region, abort, zk, versionOfClosingNode); } else { - crh = new CloseRegionHandler(this, this, region, abort, zk, - versionOfClosingNode); + crh = new CloseRegionHandler(this, this, region, abort, zk, versionOfClosingNode, sn); } this.service.submit(crh); return true; @@ -2543,6 +2540,25 @@ public class HRegionServer implements ClientProtocol, return this.onlineRegions.get(encodedRegionName); } + + @Override + public boolean removeFromOnlineRegions(final String encodedRegionName, ServerName destination) { + HRegion toReturn = this.onlineRegions.remove(encodedRegionName); + + if (destination != null){ + addToMovedRegions(encodedRegionName, destination); + } + + //Clear all of the dynamic metrics as they are now probably useless. + //This is a clear because dynamic metrics could include metrics per cf and + //per hfile. Figuring out which cfs, hfiles, and regions are still relevant to + //this region server would be an onerous task. Instead just clear everything + //and on the next tick of the metrics everything that is still relevant will be + //re-added. + this.dynamicMetrics.clear(); + return toReturn != null; + } + /** * Protected utility method for safely obtaining an HRegion handle. * @@ -2553,11 +2569,21 @@ public class HRegionServer implements ClientProtocol, */ protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { - HRegion region = null; - region = getOnlineRegion(regionName); + String encodedRegionName = HRegionInfo.encodeRegionName(regionName); + return getRegionByEncodedName(encodedRegionName); + } + + protected HRegion getRegionByEncodedName(String encodedRegionName) + throws NotServingRegionException { + + HRegion region = this.onlineRegions.get(encodedRegionName); if (region == null) { - throw new NotServingRegionException("Region is not online: " + - Bytes.toStringBinary(regionName)); + ServerName sn = getMovedRegion(encodedRegionName); + if (sn != null) { + throw new RegionMovedException(sn.getHostname(), sn.getPort()); + } else { + throw new NotServingRegionException("Region is not online: " + encodedRegionName); + } } return region; } @@ -3396,7 +3422,7 @@ public class HRegionServer implements ClientProtocol, } else { LOG.warn("The region " + region.getEncodedName() + " is online on this server but META does not have this server."); - removeFromOnlineRegions(region.getEncodedName()); + removeFromOnlineRegions(region.getEncodedName(), null); } } LOG.info("Received request to open region: " @@ -3438,6 +3464,9 @@ public class HRegionServer implements ClientProtocol, versionOfClosingNode = request.getVersionOfClosingNode(); } boolean zk = request.getTransitionInZK(); + final ServerName sn = (request.hasDestinationServer() ? + ProtobufUtil.toServerName(request.getDestinationServer()) : null); + try { checkOpen(); requestCount.incrementAndGet(); @@ -3445,11 +3474,12 @@ public class HRegionServer implements ClientProtocol, CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder(); LOG.info("Received close region: " + region.getRegionNameAsString() + - ". Version of ZK closing node:" + versionOfClosingNode); + ". Version of ZK closing node:" + versionOfClosingNode + + ". Destination server:" + sn); HRegionInfo regionInfo = region.getRegionInfo(); checkIfRegionInTransition(regionInfo, CLOSE); boolean closed = closeRegion( - regionInfo, false, zk, versionOfClosingNode); + regionInfo, false, zk, versionOfClosingNode, sn); builder.setClosed(closed); return builder.build(); } catch (IOException ie) { @@ -3648,16 +3678,10 @@ public class HRegionServer implements ClientProtocol, RegionSpecifierType type = regionSpecifier.getType(); checkOpen(); switch (type) { - case REGION_NAME: - return getRegion(value); - case ENCODED_REGION_NAME: - String encodedRegionName = Bytes.toString(value); - HRegion region = this.onlineRegions.get(encodedRegionName); - if (region == null) { - throw new NotServingRegionException( - "Region is not online: " + encodedRegionName); - } - return region; + case REGION_NAME: + return getRegion(value); + case ENCODED_REGION_NAME: + return getRegionByEncodedName(Bytes.toString(value)); default: throw new DoNotRetryIOException( "Unsupported region specifier type: " + type); @@ -3793,4 +3817,95 @@ public class HRegionServer implements ClientProtocol, } region.mutateRow(rm); } + + + // This map will containsall the regions that we closed for a move. + // We add the time it was moved as we don't want to keep too old information + protected Map> movedRegions = + new ConcurrentHashMap>(3000); + + // We need a timeout. If not there is a risk of giving a wrong information: this would double + // the number of network calls instead of reducing them. + private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); + + protected void addToMovedRegions(HRegionInfo hri, ServerName destination){ + addToMovedRegions(hri.getEncodedName(), destination); + } + + protected void addToMovedRegions(String encodedName, ServerName destination){ + final Long time = System.currentTimeMillis(); + + movedRegions.put( + encodedName, + new Pair(time, destination)); + } + + private ServerName getMovedRegion(final String encodedRegionName) { + Pair dest = movedRegions.get(encodedRegionName); + + if (dest != null) { + if (dest.getFirst() > (System.currentTimeMillis() - TIMEOUT_REGION_MOVED)) { + return dest.getSecond(); + } else { + movedRegions.remove(encodedRegionName); + } + } + + return null; + } + + /** + * Remove the expired entries from the moved regions list. + */ + protected void cleanMovedRegions(){ + final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED; + Iterator>> it = movedRegions.entrySet().iterator(); + + while (it.hasNext()){ + Map.Entry> e = it.next(); + if (e.getValue().getFirst() < cutOff){ + it.remove(); + } + } + } + + /** + * Creates a Chore thread to clean the moved region cache. + */ + protected static class MovedRegionsCleaner extends Chore implements Stoppable { + private HRegionServer regionServer; + Stoppable stoppable; + + private MovedRegionsCleaner( + HRegionServer regionServer, Stoppable stoppable){ + super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable); + this.regionServer = regionServer; + this.stoppable = stoppable; + } + + static MovedRegionsCleaner createAndStart(HRegionServer rs){ + Stoppable stoppable = new Stoppable() { + private volatile boolean isStopped = false; + @Override public void stop(String why) { isStopped = true;} + @Override public boolean isStopped() {return isStopped;} + }; + + return new MovedRegionsCleaner(rs, stoppable); + } + + @Override + protected void chore() { + regionServer.cleanMovedRegions(); + } + + @Override + public void stop(String why) { + stoppable.stop(why); + } + + @Override + public boolean isStopped() { + return stoppable.isStopped(); + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index b038ef36afa..5f49877c35e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -23,7 +23,9 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; /** * Interface to Map of online regions. In the Map, the key is the region's @@ -41,9 +43,10 @@ interface OnlineRegions extends Server { * This method removes HRegion corresponding to hri from the Map of onlineRegions. * * @param encodedRegionName + * @param destination - destination, if any. Null otherwise * @return True if we removed a region from online list. */ - public boolean removeFromOnlineRegions(String encodedRegionName); + public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination); /** * Return {@link HRegion} instance. @@ -62,4 +65,4 @@ interface OnlineRegions extends Server { * @throws java.io.IOException */ public List getOnlineRegions(byte[] tableName) throws IOException; -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 231e2d673e3..6a9f2fe4291 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -274,7 +274,7 @@ public class SplitTransaction { } if (!testing) { - services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName()); + services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName(), null); } this.journal.add(JournalEntry.OFFLINED_PARENT); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java index 082bd3c7bb5..e40462a6304 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -60,11 +61,12 @@ public class CloseRegionHandler extends EventHandler { // close -- not the master process so state up in zk will unlikely be // CLOSING. private final boolean zk; + private ServerName destination; // This is executed after receiving an CLOSE RPC from the master. public CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo) { - this(server, rsServices, regionInfo, false, true, -1); + this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null); } /** @@ -80,13 +82,28 @@ public class CloseRegionHandler extends EventHandler { final HRegionInfo regionInfo, final boolean abort, final boolean zk, final int versionOfClosingNode) { this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, - EventType.M_RS_CLOSE_REGION); + EventType.M_RS_CLOSE_REGION, null); } - protected CloseRegionHandler(final Server server, + public CloseRegionHandler(final Server server, + final RegionServerServices rsServices, + final HRegionInfo regionInfo, final boolean abort, final boolean zk, + final int versionOfClosingNode, ServerName destination) { + this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, + EventType.M_RS_CLOSE_REGION, destination); + } + + public CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, boolean abort, final boolean zk, final int versionOfClosingNode, EventType eventType) { + this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null); + } + + protected CloseRegionHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + boolean abort, final boolean zk, final int versionOfClosingNode, + EventType eventType, ServerName destination) { super(server, eventType); this.server = server; this.rsServices = rsServices; @@ -94,6 +111,7 @@ public class CloseRegionHandler extends EventHandler { this.abort = abort; this.zk = zk; this.expectedVersion = versionOfClosingNode; + this.destination = destination; } public HRegionInfo getRegionInfo() { @@ -135,7 +153,7 @@ public class CloseRegionHandler extends EventHandler { throw new RuntimeException(t); } - this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); + this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName(), destination); if (this.zk) { if (setClosedState(this.expectedVersion, region)) { @@ -183,5 +201,4 @@ public class CloseRegionHandler extends EventHandler { } return true; } - } diff --git a/src/main/protobuf/Admin.proto b/src/main/protobuf/Admin.proto index f1ee8ff9101..599e36d24aa 100644 --- a/src/main/protobuf/Admin.proto +++ b/src/main/protobuf/Admin.proto @@ -77,6 +77,7 @@ message CloseRegionRequest { required RegionSpecifier region = 1; optional uint32 versionOfClosingNode = 2; optional bool transitionInZK = 3 [default = true]; + optional ServerName destinationServer = 4; } message CloseRegionResponse { diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index e7ceac2107c..f46c8c213d8 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -36,6 +38,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.rest.protobuf.generated.ScannerMessage; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Assert; @@ -57,7 +61,7 @@ public class TestHCM { @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(1); + TEST_UTIL.startMiniCluster(2); } @AfterClass public static void tearDownAfterClass() throws Exception { @@ -121,7 +125,7 @@ public class TestHCM { * that we really delete it. * @throws Exception */ - @Test + @Test(timeout = 60000) public void testRegionCaching() throws Exception{ HTable table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAM); TEST_UTIL.createMultiRegions(table, FAM_NAM); @@ -129,11 +133,143 @@ public class TestHCM { put.add(FAM_NAM, ROW, ROW); table.put(put); HConnectionManager.HConnectionImplementation conn = - (HConnectionManager.HConnectionImplementation)table.getConnection(); + (HConnectionManager.HConnectionImplementation)table.getConnection(); + assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); - conn.deleteCachedLocation(TABLE_NAME, ROW); + assertNotNull(conn.getCachedLocation(TABLE_NAME.clone(), ROW.clone())); + assertNotNull(conn.getCachedLocation( + Bytes.toString(TABLE_NAME).getBytes() , Bytes.toString(ROW).getBytes())); + + final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1; + conn.updateCachedLocation(conn.getCachedLocation(TABLE_NAME, ROW), "127.0.0.1", nextPort); + Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort); + + conn.deleteCachedLocation(TABLE_NAME.clone(), ROW.clone()); HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW); assertNull("What is this location?? " + rl, rl); + + // We're now going to move the region and check that it works for the client + // First a new put to add the location in the cache + conn.clearRegionCache(TABLE_NAME); + Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); + Put put2 = new Put(ROW); + put2.add(FAM_NAM, ROW, ROW); + table.put(put2); + assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); + + // We can wait for all regions to be onlines, that makes log reading easier when debugging + while (!TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().getRegionsInTransition().isEmpty()) { + } + + // Now moving the region to the second server + TEST_UTIL.getHBaseAdmin().balanceSwitch(false); + HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW); + byte[] regionName = toMove.getRegionInfo().getRegionName(); + + // Choose the other server. + int curServerId = TEST_UTIL.getHBaseCluster().getServerWith( regionName ); + int destServerId = (curServerId == 0 ? 1 : 0); + + HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); + HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); + + ServerName destServerName = destServer.getServerName(); + + // Check that we are in the expected state + Assert.assertTrue(curServer != destServer); + Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); + Assert.assertFalse( toMove.getPort() == destServerName.getPort()); + Assert.assertNotNull(curServer.getOnlineRegion(regionName)); + Assert.assertNull(destServer.getOnlineRegion(regionName)); + + // Moving. It's possible that we don't have all the regions online at this point, so + // the test must depends only on the region we're looking at. + LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); + TEST_UTIL.getHBaseAdmin().move( + toMove.getRegionInfo().getEncodedNameAsBytes(), + destServerName.getServerName().getBytes() + ); + + while ( destServer.getOnlineRegion(regionName) == null ){ + // wait for the move to be finished + } + + + // Check our new state. + Assert.assertNull(curServer.getOnlineRegion(regionName)); + Assert.assertNotNull(destServer.getOnlineRegion(regionName)); + LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); + + // Cache was NOT updated and points to the wrong server + Assert.assertFalse( conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort()); + + // Hijack the number of retry to fail immediately instead of retrying: there will be no new + // connection to the master + Field numRetries = conn.getClass().getDeclaredField("numRetries"); + numRetries.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL); + final int prevNumRetriesVal = (Integer)numRetries.get(conn); + numRetries.set(conn, 1); + + // We do a put and expect the cache to be updated, even if we don't retry + LOG.info("Put starting"); + Put put3 = new Put(ROW); + put3.add(FAM_NAM, ROW, ROW); + try { + table.put(put3); + Assert.assertFalse("Unreachable point", true); + }catch (Throwable e){ + LOG.info("Put done, expected exception caught: "+e.getClass()); + } + Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); + Assert.assertEquals( + "Previous server was "+curServer.getServerName().getHostAndPort(), + destServerName.getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort()); + + + // We move it back to do another test with a scan + LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString()); + TEST_UTIL.getHBaseAdmin().move( + toMove.getRegionInfo().getEncodedNameAsBytes(), + curServer.getServerName().getServerName().getBytes() + ); + + while ( curServer.getOnlineRegion(regionName) == null ){ + // wait for the move to be finished + } + + // Check our new state. + Assert.assertNotNull(curServer.getOnlineRegion(regionName)); + Assert.assertNull(destServer.getOnlineRegion(regionName)); + LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString()); + + // Cache was NOT updated and points to the wrong server + Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getPort() == + curServer.getServerName().getPort()); + + + Scan sc = new Scan(); + sc.setStopRow(ROW); + sc.setStopRow(ROW); + + try { + ResultScanner rs = table.getScanner(sc); + while (rs.next() != null){} + Assert.assertFalse("Unreachable point", true); + }catch (Throwable e){ + LOG.info("Put done, expected exception caught: "+e.getClass()); + } + + // Cache is updated with the right value. + Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); + Assert.assertEquals( + "Previous server was "+destServer.getServerName().getHostAndPort(), + curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort()); + + numRetries.set(conn, prevNumRetriesVal); table.close(); } diff --git a/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index f3168d1f328..69ccc652425 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -250,7 +250,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer } @Override - public boolean removeFromOnlineRegions(String encodedRegionName) { + public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) { // TODO Auto-generated method stub return false; } @@ -508,4 +508,4 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer // TODO Auto-generated method stub return null; } -} \ No newline at end of file +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 7d0275960cf..3f61cfbd5b8 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ b/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -50,7 +50,7 @@ public class MockRegionServerServices implements RegionServerServices { private HFileSystem hfs = null; @Override - public boolean removeFromOnlineRegions(String encodedRegionName) { + public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) { return this.regions.remove(encodedRegionName) != null; }