HBASE-6611 Forcing region state offline cause double assignment

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1400358 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-10-20 03:57:49 +00:00
parent 312e9187ae
commit a5bd102cd8
26 changed files with 1797 additions and 1287 deletions

View File

@ -54,7 +54,7 @@ public class RegionTransition {
}
public ServerName getServerName() {
return ProtobufUtil.toServerName(this.rt.getOriginServerName());
return ProtobufUtil.toServerName(this.rt.getServerName());
}
public long getCreateTime() {
@ -105,7 +105,7 @@ public class RegionTransition {
setHostName(sn.getHostname()).setPort(sn.getPort()).setStartCode(sn.getStartcode()).build();
ZooKeeperProtos.RegionTransition.Builder builder = ZooKeeperProtos.RegionTransition.newBuilder().
setEventTypeCode(type.getCode()).setRegionName(ByteString.copyFrom(regionName)).
setOriginServerName(pbsn);
setServerName(pbsn);
builder.setCreateTime(System.currentTimeMillis());
if (payload != null) builder.setPayload(ByteString.copyFrom(payload));
return new RegionTransition(builder.build());

View File

@ -42,7 +42,7 @@ public class AssignCallable implements Callable<Object> {
@Override
public Object call() throws Exception {
assignmentManager.assign(hri, true, true, true);
assignmentManager.assign(hri, true, true);
return null;
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
@ -60,19 +59,6 @@ public class GeneralBulkAssigner extends BulkAssigner {
this.assignmentManager = am;
}
@Override
public boolean bulkAssign(boolean sync) throws InterruptedException,
IOException {
// Disable timing out regions in transition up in zk while bulk assigning.
this.assignmentManager.timeoutMonitor.bulkAssign(true);
try {
return super.bulkAssign(sync);
} finally {
// Re-enable timing out regions in transition up in zk.
this.assignmentManager.timeoutMonitor.bulkAssign(false);
}
}
@Override
protected String getThreadNamePrefix() {
return this.server.getServerName() + "-GeneralBulkAssigner";
@ -204,7 +190,7 @@ public class GeneralBulkAssigner extends BulkAssigner {
reassigningRegions.addAll(failedPlans.remove(e.getKey()));
}
for (HRegionInfo region : reassigningRegions) {
assignmentManager.assign(region, true, true);
assignmentManager.invokeAssign(region);
}
return reassigningRegions.size();
}

View File

@ -673,6 +673,7 @@ Server {
if (!masterRecovery) {
this.assignmentManager.startTimeOutMonitor();
}
// TODO: Should do this in background rather than block master startup
status.setStatus("Splitting logs after master startup");
splitLogAfterStartup(this.fileSystemManager);
@ -2136,7 +2137,7 @@ Server {
return arr;
}
}
assignRegion(regionInfo);
assignmentManager.assign(regionInfo, true, true);
if (cpHost != null) {
cpHost.postAssign(regionInfo);
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* Callback handler for creating unassigned offline znodes
* used during bulk assign, async setting region to offline.
*/
@InterfaceAudience.Private
public class OfflineCallback implements StringCallback {
private final Log LOG = LogFactory.getLog(OfflineCallback.class);
private final ExistCallback callBack;
private final ZooKeeperWatcher zkw;
private final ServerName destination;
OfflineCallback(final RegionStates regionStates,
final ZooKeeperWatcher zkw, final ServerName destination,
final AtomicInteger counter, final Map<String, Integer> offlineNodesVersions) {
this.callBack = new ExistCallback(
regionStates, counter, destination, offlineNodesVersions);
this.destination = destination;
this.zkw = zkw;
}
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
LOG.warn("Node for " + path + " already exists");
} else if (rc != 0) {
// This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
this.zkw.abort("Connectionloss writing unassigned at " + path +
", rc=" + rc, null);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("rs=" + (RegionState)ctx
+ ", server=" + this.destination.toString());
}
// Async exists to set a watcher so we'll get triggered when
// unassigned node changes.
ZooKeeper zk = this.zkw.getRecoverableZooKeeper().getZooKeeper();
zk.exists(path, this.zkw, callBack, ctx);
}
/**
* Callback handler for the exists call that sets watcher on unassigned znodes.
* Used during bulk assign on startup.
*/
static class ExistCallback implements StatCallback {
private final Log LOG = LogFactory.getLog(ExistCallback.class);
private final Map<String, Integer> offlineNodesVersions;
private final RegionStates regionStates;
private final AtomicInteger counter;
private ServerName destination;
ExistCallback(final RegionStates regionStates,
final AtomicInteger counter, ServerName destination,
final Map<String, Integer> offlineNodesVersions) {
this.offlineNodesVersions = offlineNodesVersions;
this.regionStates = regionStates;
this.counter = counter;
this.destination = destination;
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc != 0) {
// This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
return;
}
RegionState state = (RegionState)ctx;
if (LOG.isDebugEnabled()) {
LOG.debug("rs=" + state
+ ", server=" + this.destination.toString());
}
// Transition RegionState to PENDING_OPEN here in master; means we've
// sent the open. We're a little ahead of ourselves here since we've not
// yet sent out the actual open but putting this state change after the
// call to open risks our writing PENDING_OPEN after state has been moved
// to OPENING by the regionserver.
HRegionInfo region = state.getRegion();
offlineNodesVersions.put(
region.getEncodedName(), Integer.valueOf(stat.getVersion()));
regionStates.updateRegionState(region,
RegionState.State.PENDING_OPEN, destination);
this.counter.addAndGet(1);
}
}
}

View File

@ -126,6 +126,14 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return state == State.SPLIT;
}
public boolean isPendingOpenOrOpeningOnServer(final ServerName sn) {
return isOnServer(sn) && (isPendingOpen() || isOpening());
}
public boolean isPendingCloseOrClosingOnServer(final ServerName sn) {
return isOnServer(sn) && (isPendingClose() || isClosing());
}
@Override
public String toString() {
return "{" + region.getRegionNameAsString()
@ -234,6 +242,10 @@ public class RegionState implements org.apache.hadoop.io.Writable {
return new RegionState(HRegionInfo.convert(proto.getRegionInfo()),state,proto.getStamp(),null);
}
private boolean isOnServer(final ServerName sn) {
return serverName != null && serverName.equals(sn);
}
/**
* @deprecated Writables are going away
*/

View File

@ -205,15 +205,6 @@ public class RegionStates {
return updateRegionState(hri, state, serverName);
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state, final ServerName serverName) {
return updateRegionState(hri, state, System.currentTimeMillis(), serverName);
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
@ -234,15 +225,15 @@ public class RegionStates {
return null;
}
return updateRegionState(regionInfo, state,
transition.getCreateTime(), transition.getServerName());
transition.getServerName());
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(final HRegionInfo hri,
final State state, final long stamp, final ServerName serverName) {
public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state, final ServerName serverName) {
ServerName newServerName = serverName;
if (serverName != null &&
(state == State.CLOSED || state == State.OFFLINE)) {
@ -252,7 +243,8 @@ public class RegionStates {
}
String regionName = hri.getEncodedName();
RegionState regionState = new RegionState(hri, state, stamp, newServerName);
RegionState regionState = new RegionState(
hri, state, System.currentTimeMillis(), newServerName);
RegionState oldState = regionStates.put(regionName, regionState);
LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState);
if (state != State.SPLITTING && (newServerName != null

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ServiceException;
@ -602,11 +603,11 @@ public class ServerManager {
* Open should not fail but can if server just crashed.
* <p>
* @param server server to open a region
* @param regions regions to open
* @param regionOpenInfos info of a list of regions to open
* @return a list of region opening states
*/
public List<RegionOpeningState> sendRegionOpen(ServerName server,
List<HRegionInfo> regions)
List<Pair<HRegionInfo, Integer>> regionOpenInfos)
throws IOException {
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
@ -615,8 +616,14 @@ public class ServerManager {
return null;
}
OpenRegionResponse response = ProtobufUtil.openRegion(admin, regions);
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(regionOpenInfos);
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningStateList(response);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
@ -634,7 +641,7 @@ public class ServerManager {
* @throws IOException
*/
public boolean sendRegionClose(ServerName server, HRegionInfo region,
int versionOfClosingNode, ServerName dest) throws IOException {
int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
if (server == null) throw new NullPointerException("Passed server is null");
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
@ -644,12 +651,12 @@ public class ServerManager {
" failed because no RPC connection found to this server");
}
return ProtobufUtil.closeRegion(admin, region.getRegionName(),
versionOfClosingNode, dest);
versionOfClosingNode, dest, transitionInZK);
}
public boolean sendRegionClose(ServerName server, HRegionInfo region,
int versionOfClosingNode) throws IOException {
return sendRegionClose(server, region, versionOfClosingNode, null);
public boolean sendRegionClose(ServerName server,
HRegionInfo region, int versionOfClosingNode) throws IOException {
return sendRegionClose(server, region, versionOfClosingNode, null, true);
}
/**

View File

@ -228,13 +228,8 @@ public class EnableTableHandler extends EventHandler {
final HRegionInfo hri = region;
pool.execute(Trace.wrap(new Runnable() {
public void run() {
if (retainAssignment) {
// Already plan is populated.
assignmentManager.assign(hri, true, false, false);
} else {
assignmentManager.assign(hri, true);
}
}
}));
}
} else {

View File

@ -100,8 +100,7 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
RegionState regionState = this.assignmentManager.getRegionStates()
.getRegionTransitionState(regionInfo.getEncodedName());
boolean openedNodeDeleted = false;
if (regionState != null
&& regionState.getState().equals(RegionState.State.OPEN)) {
if (regionState != null && regionState.isOpened()) {
openedNodeDeleted = deleteOpenedNode(expectedVersion);
if (!openedNodeDeleted) {
LOG.error("The znode of region " + regionInfo.getRegionNameAsString()

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.protobuf;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@ -32,11 +34,11 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DeserializationException;
@ -65,8 +67,8 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
@ -108,12 +109,13 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValu
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -128,18 +130,16 @@ import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.*;
/**
* Protobufs utility.
*/
@SuppressWarnings("deprecation")
public final class ProtobufUtil {
private ProtobufUtil() {
@ -983,6 +983,7 @@ public final class ProtobufUtil {
* @param proto the protocol buffer Comparator to convert
* @return the converted ByteArrayComparable
*/
@SuppressWarnings("unchecked")
public static ByteArrayComparable toComparator(ComparatorProtos.Comparator proto)
throws IOException {
String type = proto.getName();
@ -1007,6 +1008,7 @@ public final class ProtobufUtil {
* @param proto the protocol buffer Filter to convert
* @return the converted Filter
*/
@SuppressWarnings("unchecked")
public static Filter toFilter(HBaseProtos.Filter proto) throws IOException {
String type = proto.getName();
final byte [] value = proto.getSerializedFilter().toByteArray();
@ -1349,6 +1351,7 @@ public final class ProtobufUtil {
}
}
@SuppressWarnings("unchecked")
public static <T extends Service> T newServiceStub(Class<T> service, RpcChannel channel)
throws Exception {
return (T)Methods.call(service, null, "newStub",
@ -1400,28 +1403,6 @@ public final class ProtobufUtil {
}
}
/**
* A helper to close a region given a region name
* using admin protocol.
*
* @param admin
* @param regionName
* @param versionOfClosingNode
* @return true if the region is closed
* @throws IOException
*/
public static boolean closeRegion(final AdminProtocol admin,
final byte[] regionName, final int versionOfClosingNode) throws IOException {
CloseRegionRequest closeRegionRequest =
RequestConverter.buildCloseRegionRequest(regionName, versionOfClosingNode);
try {
CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest);
return ResponseConverter.isClosed(response);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to close a region given a region name
* using admin protocol.
@ -1433,9 +1414,11 @@ public final class ProtobufUtil {
* @throws IOException
*/
public static boolean closeRegion(final AdminProtocol admin, final byte[] regionName,
final int versionOfClosingNode, final ServerName destinationServer) throws IOException {
final int versionOfClosingNode, final ServerName destinationServer,
final boolean transitionInZK) throws IOException {
CloseRegionRequest closeRegionRequest =
RequestConverter.buildCloseRegionRequest(regionName, versionOfClosingNode, destinationServer);
RequestConverter.buildCloseRegionRequest(
regionName, versionOfClosingNode, destinationServer, transitionInZK);
try {
CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest);
return ResponseConverter.isClosed(response);
@ -1462,25 +1445,6 @@ public final class ProtobufUtil {
}
}
/**
* A helper to open a list of regions using admin protocol.
*
* @param admin
* @param regions
* @return OpenRegionResponse
* @throws IOException
*/
public static OpenRegionResponse openRegion(final AdminProtocol admin,
final List<HRegionInfo> regions) throws IOException {
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(regions);
try {
return admin.openRegion(null, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to get the all the online regions on a region
* server using admin protocol.
@ -1839,6 +1803,28 @@ public final class ProtobufUtil {
return perms;
}
/**
* Find the HRegion encoded name based on a region specifier
*
* @param regionSpecifier the region specifier
* @return the corresponding region's encoded name
* @throws DoNotRetryIOException if the specifier type is unsupported
*/
public static String getRegionEncodedName(
final RegionSpecifier regionSpecifier) throws DoNotRetryIOException {
byte[] value = regionSpecifier.getValue().toByteArray();
RegionSpecifierType type = regionSpecifier.getType();
switch (type) {
case REGION_NAME:
return HRegionInfo.encodeRegionName(value);
case ENCODED_REGION_NAME:
return Bytes.toString(value);
default:
throw new DoNotRetryIOException(
"Unsupported region specifier type: " + type);
}
}
/**
* Unwraps an exception from a protobuf service into the underlying (expected) IOException.
* This method will <strong>always</strong> throw an exception.

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
@ -90,17 +92,16 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTable
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -115,6 +116,7 @@ import com.google.protobuf.ByteString;
* or build components for protocol buffer requests.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public final class RequestConverter {
private RequestConverter() {
@ -612,29 +614,21 @@ public final class RequestConverter {
/**
* Create a protocol buffer OpenRegionRequest to open a list of regions
*
* @param regions the list of regions to open
* @param regionOpenInfos info of a list of regions to open
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest
buildOpenRegionRequest(final List<HRegionInfo> regions) {
buildOpenRegionRequest(final List<Pair<HRegionInfo, Integer>> regionOpenInfos) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
for (HRegionInfo region: regions) {
builder.addRegion(HRegionInfo.convert(region));
for (Pair<HRegionInfo, Integer> regionOpenInfo: regionOpenInfos) {
Integer second = regionOpenInfo.getSecond();
int versionOfOfflineNode = second == null ? -1 : second.intValue();
builder.addOpenInfo(buildRegionOpenInfo(
regionOpenInfo.getFirst(), versionOfOfflineNode));
}
return builder.build();
}
/**
* Create a protocol buffer OpenRegionRequest for a given region
*
* @param region the region to open
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest
buildOpenRegionRequest(final HRegionInfo region) {
return buildOpenRegionRequest(region, -1);
}
/**
* Create a protocol buffer OpenRegionRequest for a given region
*
@ -645,10 +639,7 @@ public final class RequestConverter {
public static OpenRegionRequest buildOpenRegionRequest(
final HRegionInfo region, final int versionOfOfflineNode) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.addRegion(HRegionInfo.convert(region));
if (versionOfOfflineNode >= 0) {
builder.setVersionOfOfflineNode(versionOfOfflineNode);
}
builder.addOpenInfo(buildRegionOpenInfo(region, versionOfOfflineNode));
return builder.build();
}
@ -669,32 +660,15 @@ public final class RequestConverter {
return builder.build();
}
/**
* Create a CloseRegionRequest for a given region name
*
* @param regionName the name of the region to close
* @param versionOfClosingNode
* the version of znode to compare when RS transitions the znode from
* CLOSING state.
* @return a CloseRegionRequest
*/
public static CloseRegionRequest buildCloseRegionRequest(
final byte[] regionName, final int versionOfClosingNode) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setVersionOfClosingNode(versionOfClosingNode);
return builder.build();
}
public static CloseRegionRequest buildCloseRegionRequest(
final byte[] regionName, final int versionOfClosingNode, ServerName destinationServer) {
final byte[] regionName, final int versionOfClosingNode,
ServerName destinationServer, final boolean transitionInZK) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setVersionOfClosingNode(versionOfClosingNode);
builder.setTransitionInZK(transitionInZK);
if (destinationServer != null){
builder.setDestinationServer(ProtobufUtil.toServerName( destinationServer) );
}
@ -1153,4 +1127,17 @@ public final class RequestConverter {
return GetLastFlushedSequenceIdRequest.newBuilder().setRegionName(
ByteString.copyFrom(regionName)).build();
}
/**
* Create a RegionOpenInfo based on given region info and version of offline node
*/
private static RegionOpenInfo buildRegionOpenInfo(
final HRegionInfo region, final int versionOfOfflineNode) {
RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
builder.setRegion(HRegionInfo.convert(region));
if (versionOfOfflineNode >= 0) {
builder.setVersionOfOfflineNode(versionOfOfflineNode);
}
return builder.build();
}
}

View File

@ -21,13 +21,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.UserPermissionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@ -47,8 +45,7 @@ import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.ByteString;
import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.UserPermissionsResponse;
import com.google.protobuf.RpcController;
/**
* Helper utility to build protocol buffer responses,

View File

@ -1385,10 +1385,10 @@ public final class ZooKeeperProtos {
boolean hasCreateTime();
long getCreateTime();
// optional .ServerName originServerName = 4;
boolean hasOriginServerName();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder();
// required .ServerName serverName = 4;
boolean hasServerName();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder();
// optional bytes payload = 5;
boolean hasPayload();
@ -1453,17 +1453,17 @@ public final class ZooKeeperProtos {
return createTime_;
}
// optional .ServerName originServerName = 4;
public static final int ORIGINSERVERNAME_FIELD_NUMBER = 4;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName originServerName_;
public boolean hasOriginServerName() {
// required .ServerName serverName = 4;
public static final int SERVERNAME_FIELD_NUMBER = 4;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName serverName_;
public boolean hasServerName() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName() {
return originServerName_;
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName() {
return serverName_;
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder() {
return originServerName_;
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder() {
return serverName_;
}
// optional bytes payload = 5;
@ -1480,7 +1480,7 @@ public final class ZooKeeperProtos {
eventTypeCode_ = 0;
regionName_ = com.google.protobuf.ByteString.EMPTY;
createTime_ = 0L;
originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
payload_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
@ -1500,11 +1500,13 @@ public final class ZooKeeperProtos {
memoizedIsInitialized = 0;
return false;
}
if (hasOriginServerName()) {
if (!getOriginServerName().isInitialized()) {
if (!hasServerName()) {
memoizedIsInitialized = 0;
return false;
}
if (!getServerName().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
@ -1523,7 +1525,7 @@ public final class ZooKeeperProtos {
output.writeUInt64(3, createTime_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeMessage(4, originServerName_);
output.writeMessage(4, serverName_);
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(5, payload_);
@ -1551,7 +1553,7 @@ public final class ZooKeeperProtos {
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(4, originServerName_);
.computeMessageSize(4, serverName_);
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
@ -1595,10 +1597,10 @@ public final class ZooKeeperProtos {
result = result && (getCreateTime()
== other.getCreateTime());
}
result = result && (hasOriginServerName() == other.hasOriginServerName());
if (hasOriginServerName()) {
result = result && getOriginServerName()
.equals(other.getOriginServerName());
result = result && (hasServerName() == other.hasServerName());
if (hasServerName()) {
result = result && getServerName()
.equals(other.getServerName());
}
result = result && (hasPayload() == other.hasPayload());
if (hasPayload()) {
@ -1626,9 +1628,9 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + CREATETIME_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getCreateTime());
}
if (hasOriginServerName()) {
hash = (37 * hash) + ORIGINSERVERNAME_FIELD_NUMBER;
hash = (53 * hash) + getOriginServerName().hashCode();
if (hasServerName()) {
hash = (37 * hash) + SERVERNAME_FIELD_NUMBER;
hash = (53 * hash) + getServerName().hashCode();
}
if (hasPayload()) {
hash = (37 * hash) + PAYLOAD_FIELD_NUMBER;
@ -1742,7 +1744,7 @@ public final class ZooKeeperProtos {
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getOriginServerNameFieldBuilder();
getServerNameFieldBuilder();
}
}
private static Builder create() {
@ -1757,10 +1759,10 @@ public final class ZooKeeperProtos {
bitField0_ = (bitField0_ & ~0x00000002);
createTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000004);
if (originServerNameBuilder_ == null) {
originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
if (serverNameBuilder_ == null) {
serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
} else {
originServerNameBuilder_.clear();
serverNameBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
payload_ = com.google.protobuf.ByteString.EMPTY;
@ -1818,10 +1820,10 @@ public final class ZooKeeperProtos {
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
if (originServerNameBuilder_ == null) {
result.originServerName_ = originServerName_;
if (serverNameBuilder_ == null) {
result.serverName_ = serverName_;
} else {
result.originServerName_ = originServerNameBuilder_.build();
result.serverName_ = serverNameBuilder_.build();
}
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
to_bitField0_ |= 0x00000010;
@ -1852,8 +1854,8 @@ public final class ZooKeeperProtos {
if (other.hasCreateTime()) {
setCreateTime(other.getCreateTime());
}
if (other.hasOriginServerName()) {
mergeOriginServerName(other.getOriginServerName());
if (other.hasServerName()) {
mergeServerName(other.getServerName());
}
if (other.hasPayload()) {
setPayload(other.getPayload());
@ -1875,11 +1877,13 @@ public final class ZooKeeperProtos {
return false;
}
if (hasOriginServerName()) {
if (!getOriginServerName().isInitialized()) {
if (!hasServerName()) {
return false;
}
if (!getServerName().isInitialized()) {
return false;
}
return true;
}
@ -1924,11 +1928,11 @@ public final class ZooKeeperProtos {
}
case 34: {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
if (hasOriginServerName()) {
subBuilder.mergeFrom(getOriginServerName());
if (hasServerName()) {
subBuilder.mergeFrom(getServerName());
}
input.readMessage(subBuilder, extensionRegistry);
setOriginServerName(subBuilder.buildPartial());
setServerName(subBuilder.buildPartial());
break;
}
case 42: {
@ -2008,94 +2012,94 @@ public final class ZooKeeperProtos {
return this;
}
// optional .ServerName originServerName = 4;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
// required .ServerName serverName = 4;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> originServerNameBuilder_;
public boolean hasOriginServerName() {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> serverNameBuilder_;
public boolean hasServerName() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getOriginServerName() {
if (originServerNameBuilder_ == null) {
return originServerName_;
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getServerName() {
if (serverNameBuilder_ == null) {
return serverName_;
} else {
return originServerNameBuilder_.getMessage();
return serverNameBuilder_.getMessage();
}
}
public Builder setOriginServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (originServerNameBuilder_ == null) {
public Builder setServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (serverNameBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
originServerName_ = value;
serverName_ = value;
onChanged();
} else {
originServerNameBuilder_.setMessage(value);
serverNameBuilder_.setMessage(value);
}
bitField0_ |= 0x00000008;
return this;
}
public Builder setOriginServerName(
public Builder setServerName(
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) {
if (originServerNameBuilder_ == null) {
originServerName_ = builderForValue.build();
if (serverNameBuilder_ == null) {
serverName_ = builderForValue.build();
onChanged();
} else {
originServerNameBuilder_.setMessage(builderForValue.build());
serverNameBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000008;
return this;
}
public Builder mergeOriginServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (originServerNameBuilder_ == null) {
public Builder mergeServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (serverNameBuilder_ == null) {
if (((bitField0_ & 0x00000008) == 0x00000008) &&
originServerName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
originServerName_ =
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(originServerName_).mergeFrom(value).buildPartial();
serverName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
serverName_ =
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(serverName_).mergeFrom(value).buildPartial();
} else {
originServerName_ = value;
serverName_ = value;
}
onChanged();
} else {
originServerNameBuilder_.mergeFrom(value);
serverNameBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000008;
return this;
}
public Builder clearOriginServerName() {
if (originServerNameBuilder_ == null) {
originServerName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
public Builder clearServerName() {
if (serverNameBuilder_ == null) {
serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
onChanged();
} else {
originServerNameBuilder_.clear();
serverNameBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getOriginServerNameBuilder() {
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getServerNameBuilder() {
bitField0_ |= 0x00000008;
onChanged();
return getOriginServerNameFieldBuilder().getBuilder();
return getServerNameFieldBuilder().getBuilder();
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getOriginServerNameOrBuilder() {
if (originServerNameBuilder_ != null) {
return originServerNameBuilder_.getMessageOrBuilder();
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder() {
if (serverNameBuilder_ != null) {
return serverNameBuilder_.getMessageOrBuilder();
} else {
return originServerName_;
return serverName_;
}
}
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>
getOriginServerNameFieldBuilder() {
if (originServerNameBuilder_ == null) {
originServerNameBuilder_ = new com.google.protobuf.SingleFieldBuilder<
getServerNameFieldBuilder() {
if (serverNameBuilder_ == null) {
serverNameBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>(
originServerName_,
serverName_,
getParentForChildren(),
isClean());
originServerName_ = null;
serverName_ = null;
}
return originServerNameBuilder_;
return serverNameBuilder_;
}
// optional bytes payload = 5;
@ -4960,25 +4964,24 @@ public final class ZooKeeperProtos {
"\n\017ZooKeeper.proto\032\013hbase.proto\"/\n\020RootRe" +
"gionServer\022\033\n\006server\030\001 \002(\0132\013.ServerName\"" +
"%\n\006Master\022\033\n\006master\030\001 \002(\0132\013.ServerName\"\036" +
"\n\tClusterUp\022\021\n\tstartDate\030\001 \002(\t\"\211\001\n\020Regio" +
"\n\tClusterUp\022\021\n\tstartDate\030\001 \002(\t\"\203\001\n\020Regio" +
"nTransition\022\025\n\reventTypeCode\030\001 \002(\r\022\022\n\nre" +
"gionName\030\002 \002(\014\022\022\n\ncreateTime\030\003 \002(\004\022%\n\020or" +
"iginServerName\030\004 \001(\0132\013.ServerName\022\017\n\007pay" +
"load\030\005 \001(\014\"\230\001\n\014SplitLogTask\022\"\n\005state\030\001 \002" +
"(\0162\023.SplitLogTask.State\022\037\n\nserverName\030\002 " +
"\002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGNED\020",
"\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003" +
"ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.St" +
"ate:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DI" +
"SABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n" +
"\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020" +
"ReplicationState\022&\n\005state\030\001 \002(\0162\027.Replic" +
"ationState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014" +
"\n\010DISABLED\020\001\"+\n\027ReplicationHLogPosition\022" +
"\020\n\010position\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\t" +
"lockOwner\030\001 \002(\tBE\n*org.apache.hadoop.hba",
"se.protobuf.generatedB\017ZooKeeperProtosH\001" +
"\210\001\001\240\001\001"
"gionName\030\002 \002(\014\022\022\n\ncreateTime\030\003 \002(\004\022\037\n\nse" +
"rverName\030\004 \002(\0132\013.ServerName\022\017\n\007payload\030\005" +
" \001(\014\"\230\001\n\014SplitLogTask\022\"\n\005state\030\001 \002(\0162\023.S" +
"plitLogTask.State\022\037\n\nserverName\030\002 \002(\0132\013." +
"ServerName\"C\n\005State\022\016\n\nUNASSIGNED\020\000\022\t\n\005O",
"WNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\"" +
"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.State:\007E" +
"NABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED" +
"\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017Repli" +
"cationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Replic" +
"ationState\022&\n\005state\030\001 \002(\0162\027.ReplicationS" +
"tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" +
"ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" +
"ner\030\001 \002(\tBE\n*org.apache.hadoop.hbase.pro",
"tobuf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5014,7 +5017,7 @@ public final class ZooKeeperProtos {
internal_static_RegionTransition_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionTransition_descriptor,
new java.lang.String[] { "EventTypeCode", "RegionName", "CreateTime", "OriginServerName", "Payload", },
new java.lang.String[] { "EventTypeCode", "RegionName", "CreateTime", "ServerName", "Payload", },
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.class,
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionTransition.Builder.class);
internal_static_SplitLogTask_descriptor =

View File

@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
@ -171,7 +172,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -228,7 +228,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -2595,14 +2594,13 @@ public class HRegionServer implements ClientProtocol,
}
}
protected void checkIfRegionInTransition(HRegionInfo region,
protected void checkIfRegionInTransition(byte[] regionEncodedName,
String currentAction) throws RegionAlreadyInTransitionException {
byte[] encodedName = region.getEncodedNameAsBytes();
if (this.regionsInTransitionInRS.containsKey(encodedName)) {
boolean openAction = this.regionsInTransitionInRS.get(encodedName);
if (this.regionsInTransitionInRS.containsKey(regionEncodedName)) {
boolean openAction = this.regionsInTransitionInRS.get(regionEncodedName);
// The below exception message will be used in master.
throw new RegionAlreadyInTransitionException("Received:" + currentAction +
" for the region:" + region.getRegionNameAsString() +
" for the region:" + Bytes.toString(regionEncodedName) +
" ,which we are already trying to " +
(openAction ? OPEN : CLOSE)+ ".");
}
@ -3568,12 +3566,8 @@ public class HRegionServer implements ClientProtocol,
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public OpenRegionResponse openRegion(final RpcController controller, final OpenRegionRequest request)
throws ServiceException {
int versionOfOfflineNode = -1;
if (request.hasVersionOfOfflineNode()) {
versionOfOfflineNode = request.getVersionOfOfflineNode();
}
public OpenRegionResponse openRegion(final RpcController controller,
final OpenRegionRequest request) throws ServiceException {
try {
checkOpen();
} catch (IOException ie) {
@ -3581,13 +3575,18 @@ public class HRegionServer implements ClientProtocol,
}
requestCount.incrementAndGet();
OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
Map<String, HTableDescriptor> htds = new HashMap<String, HTableDescriptor>(
request.getRegionList().size());
boolean isBulkAssign = request.getRegionList().size() > 1;
for (RegionInfo regionInfo : request.getRegionList()) {
HRegionInfo region = HRegionInfo.convert(regionInfo);
int regionCount = request.getOpenInfoCount();
Map<String, HTableDescriptor> htds =
new HashMap<String, HTableDescriptor>(regionCount);
boolean isBulkAssign = regionCount > 1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
int versionOfOfflineNode = -1;
if (regionOpenInfo.hasVersionOfOfflineNode()) {
versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
}
try {
checkIfRegionInTransition(region, OPEN);
checkIfRegionInTransition(region.getEncodedNameAsBytes(), OPEN);
HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
if (null != onlineRegion) {
// See HBASE-5094. Cross check with META if still this RS is owning
@ -3643,7 +3642,6 @@ public class HRegionServer implements ClientProtocol,
}
}
return builder.build();
}
/**
@ -3668,17 +3666,26 @@ public class HRegionServer implements ClientProtocol,
try {
checkOpen();
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
CloseRegionResponse.Builder
builder = CloseRegionResponse.newBuilder();
String encodedRegionName =
ProtobufUtil.getRegionEncodedName(request.getRegion());
byte[] encodedName = Bytes.toBytes(encodedRegionName);
Boolean openAction = regionsInTransitionInRS.get(encodedName);
if (openAction != null) {
if (openAction.booleanValue()) {
regionsInTransitionInRS.replace(encodedName, openAction, Boolean.FALSE);
}
checkIfRegionInTransition(encodedName, CLOSE);
}
HRegion region = getRegionByEncodedName(encodedRegionName);
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode +
". Destination server:" + sn);
HRegionInfo regionInfo = region.getRegionInfo();
checkIfRegionInTransition(regionInfo, CLOSE);
checkIfRegionInTransition(encodedName, CLOSE);
boolean closed = closeRegion(
regionInfo, false, zk, versionOfClosingNode, sn);
builder.setClosed(closed);
CloseRegionResponse.Builder builder =
CloseRegionResponse.newBuilder().setClosed(closed);
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
@ -3874,18 +3881,8 @@ public class HRegionServer implements ClientProtocol,
*/
protected HRegion getRegion(
final RegionSpecifier regionSpecifier) throws IOException {
byte[] value = regionSpecifier.getValue().toByteArray();
RegionSpecifierType type = regionSpecifier.getType();
checkOpen();
switch (type) {
case REGION_NAME:
return getRegion(value);
case ENCODED_REGION_NAME:
return getRegionByEncodedName(Bytes.toString(value));
default:
throw new DoNotRetryIOException(
"Unsupported region specifier type: " + type);
}
return getRegionByEncodedName(
ProtobufUtil.getRegionEncodedName(regionSpecifier));
}
/**

View File

@ -134,6 +134,14 @@ public class OpenRegionHandler extends EventHandler {
cleanupFailedOpen(region);
return;
}
// One more check to make sure we are opening instead of closing
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");
cleanupFailedOpen(region);
return;
}
// Successful region open, and add it to OnlineRegions
this.rsServices.addToOnlineRegions(region);
@ -269,6 +277,10 @@ public class OpenRegionHandler extends EventHandler {
* @throws IOException
*/
private boolean transitionToOpened(final HRegion r) throws IOException {
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");
return false;
}
boolean result = false;
HRegionInfo hri = r.getRegionInfo();
final String name = hri.getRegionNameAsString();
@ -364,6 +376,12 @@ public class OpenRegionHandler extends EventHandler {
if (region != null) region.close();
}
private boolean isRegionStillOpening() {
byte[] encodedName = regionInfo.getEncodedNameAsBytes();
Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
return action != null && action.booleanValue();
}
/**
* Transition ZK node from OFFLINE to OPENING.
* @param encodedName Name of the znode file (Region encodedName is the znode
@ -374,6 +392,10 @@ public class OpenRegionHandler extends EventHandler {
*/
boolean transitionZookeeperOfflineToOpening(final String encodedName,
int versionOfOfflineNode) {
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");
return false;
}
// TODO: should also handle transition from CLOSED?
try {
// Initialize the znode version.
@ -399,6 +421,10 @@ public class OpenRegionHandler extends EventHandler {
* @return True if successful transition.
*/
boolean tickleOpening(final String context) {
if (!isRegionStillOpening()) {
LOG.warn("Open region aborted since it isn't opening any more");
return false;
}
// If previous checks failed... do not try again.
if (!isGoodVersion()) return false;
String encodedName = this.regionInfo.getEncodedName();

View File

@ -19,14 +19,18 @@
package org.apache.hadoop.hbase.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A utility class to manage a set of locks. Each lock is identified by a String which serves
* as a key. Typical usage is: <p>
@ -44,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock;
* }
* </p>
*/
public class KeyLocker<K> {
public class KeyLocker<K extends Comparable<? super K>> {
private static final Log LOG = LogFactory.getLog(KeyLocker.class);
// The number of lock we want to easily support. It's not a maximum.
@ -78,6 +82,19 @@ public class KeyLocker<K> {
return lock.getFirst();
}
/**
* Acquire locks for a set of keys. The keys will be
* sorted internally to avoid possible deadlock.
*/
public Map<K, Lock> acquireLocks(final Set<K> keys) {
Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
SortedSet<K> sortedKeys = new TreeSet<K>(keys);
for (K key : sortedKeys) {
locks.put(key, acquireLock(key));
}
return locks;
}
/**
* Free the lock for the given key.
*/
@ -95,7 +112,9 @@ public class KeyLocker<K> {
}
}
static class KeyLock<K> extends ReentrantLock {
static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
private static final long serialVersionUID = -12432857283423584L;
private final KeyLocker<K> locker;
private final K lockId;

View File

@ -133,7 +133,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @param serverName server transition will happen on
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.NodeExistsException if node already exists
*/
@ -163,7 +163,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @param serverName server transition will happen on
* @param cb
* @param ctx
* @throws KeeperException if unexpected zookeeper exception
@ -181,35 +181,6 @@ public class ZKAssign {
ZKUtil.asyncCreate(zkw, node, rt.toByteArray(), cb, ctx);
}
/**
* Forces an existing unassigned node to the OFFLINE state for the specified
* region.
*
* <p>Does not create a new node. If a node does not already exist for this
* region, a {@link NoNodeException} will be thrown.
*
* <p>Sets a watcher on the unassigned region node if the method is
* successful.
*
* <p>This method should only be used during recovery of regionserver failure.
*
* @param zkw zk reference
* @param region region to be forced as offline
* @param serverName server event originates from
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.NoNodeException if node does not exist
*/
public static void forceNodeOffline(ZooKeeperWatcher zkw, HRegionInfo region,
ServerName serverName)
throws KeeperException, KeeperException.NoNodeException {
LOG.debug(zkw.prefix("Forcing existing unassigned node for " +
region.getEncodedName() + " to OFFLINE state"));
RegionTransition rt =
RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
String node = getNodeName(zkw, region.getEncodedName());
ZKUtil.setData(zkw, node, rt.toByteArray());
}
/**
* Creates or force updates an unassigned node to the OFFLINE state for the
* specified region.
@ -224,7 +195,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @param serverName server transition will happen on
* @return the version of the znode created in OFFLINE state, -1 if
* unsuccessful.
* @throws KeeperException if unexpected zookeeper exception
@ -232,76 +203,17 @@ public class ZKAssign {
*/
public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
HRegionInfo region, ServerName serverName) throws KeeperException {
return createOrForceNodeOffline(zkw, region, serverName, false, true);
}
/**
* Creates or force updates an unassigned node to the OFFLINE state for the
* specified region.
* <p>
* Attempts to create the node but if it exists will force it to transition to
* and OFFLINE state.
* <p>
* Sets a watcher on the unassigned region node if the method is successful.
*
* <p>
* This method should be used when assigning a region.
*
* @param zkw
* zk reference
* @param region
* region to be created as offline
* @param serverName
* server event originates from
* @param hijack
* - true if to be hijacked and reassigned, false otherwise
* @param allowCreation
* - true if the node has to be created newly, false otherwise
* @throws KeeperException
* if unexpected zookeeper exception
* @return the version of the znode created in OFFLINE state, -1 if
* unsuccessful.
* @throws KeeperException.NodeExistsException
* if node already exists
*/
public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
HRegionInfo region, ServerName serverName,
boolean hijack, boolean allowCreation)
throws KeeperException {
LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
region.getEncodedName() + " with OFFLINE state"));
RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE,
region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
byte [] data = rt.toByteArray();
String node = getNodeName(zkw, region.getEncodedName());
Stat stat = new Stat();
zkw.sync(node);
int version = ZKUtil.checkExists(zkw, node);
if (version == -1) {
// While trying to transit a node to OFFLINE that was in previously in
// OPENING state but before it could transit to OFFLINE state if RS had
// opened the region then the Master deletes the assigned region znode.
// In that case the znode will not exist. So we should not
// create the znode again which will lead to double assignment.
if (hijack && !allowCreation) {
return -1;
}
return ZKUtil.createAndWatch(zkw, node, data);
} else {
byte [] curDataInZNode = ZKAssign.getDataNoWatch(zkw, region.getEncodedName(), stat);
RegionTransition curRt = getRegionTransition(curDataInZNode);
// Do not move the node to OFFLINE if znode is in any of the following
// state.
// Because these are already executed states.
if (hijack && curRt != null) {
EventType eventType = curRt.getEventType();
if (eventType.equals(EventType.M_ZK_REGION_CLOSING)
|| eventType.equals(EventType.RS_ZK_REGION_CLOSED)
|| eventType.equals(EventType.RS_ZK_REGION_OPENED)) {
return -1;
}
}
boolean setData = false;
try {
setData = ZKUtil.setData(zkw, node, data, version);
@ -327,7 +239,7 @@ public class ZKAssign {
}
}
}
return stat.getVersion() + 1;
return version + 1;
}
/**
@ -558,7 +470,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be created as closing
* @param serverName server event originates from
* @param serverName server transition will happen on
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.NodeExistsException if node already exists
@ -596,7 +508,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be transitioned to closed
* @param serverName server event originates from
* @param serverName server transition happens on
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
*/
@ -630,7 +542,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be transitioned to opening
* @param serverName server event originates from
* @param serverName server transition happens on
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
*/
@ -670,7 +582,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be transitioned to opening
* @param serverName server event originates from
* @param serverName server transition happens on
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
*/
@ -706,7 +618,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be transitioned to opened
* @param serverName server event originates from
* @param serverName server transition happens on
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
*/
@ -739,7 +651,7 @@ public class ZKAssign {
*
* @param zkw zk reference
* @param region region to be transitioned to opened
* @param serverName server event originates from
* @param serverName server transition happens on
* @param endState state to transition node to if all checks pass
* @param beginState state the node must currently be in to do transition
* @param expectedVersion expected version of data before modification, or -1

View File

@ -64,9 +64,13 @@ message GetOnlineRegionResponse {
}
message OpenRegionRequest {
repeated RegionInfo region = 1;
repeated RegionOpenInfo openInfo = 1;
message RegionOpenInfo {
required RegionInfo region = 1;
optional uint32 versionOfOfflineNode = 2;
}
}
message OpenRegionResponse {
repeated RegionOpeningState openingState = 1;

View File

@ -62,7 +62,8 @@ message RegionTransition {
// Full regionname in bytes
required bytes regionName = 2;
required uint64 createTime = 3;
optional ServerName originServerName = 4;
// The region server where the transition will happen or is happening
required ServerName serverName = 4;
optional bytes payload = 5;
}

View File

@ -23,8 +23,7 @@ module Shell
def help
return <<-EOF
Assign a region. Use with caution. If region already assigned,
this command will just go ahead and reassign
the region anyways. For experts only.
this command will do a force reassign. For experts only.
EOF
end

View File

@ -192,6 +192,11 @@ public class TestAssignmentManager {
createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
startFakeFailedOverMasterAssignmentManager(am, this.watcher);
while (!am.processRITInvoked) Thread.sleep(1);
// As part of the failover cleanup, the balancing region plan is removed.
// So a random server will be used to open the region. For testing purpose,
// let's assume it is going to open on server b:
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
// Now fake the region closing successfully over on the regionserver; the
// regionserver will have set the region in CLOSED state. This will
// trigger callback into AM. The below zk close call is from the RS close
@ -208,7 +213,7 @@ public class TestAssignmentManager {
assertNotSame(-1, versionid);
// This uglyness below is what the openregionhandler on RS side does.
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, versionid);
assertNotSame(-1, versionid);
// Move znode from OPENING to OPENED as RS does on successful open.
@ -233,6 +238,11 @@ public class TestAssignmentManager {
createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
startFakeFailedOverMasterAssignmentManager(am, this.watcher);
while (!am.processRITInvoked) Thread.sleep(1);
// As part of the failover cleanup, the balancing region plan is removed.
// So a random server will be used to open the region. For testing purpose,
// let's assume it is going to open on server b:
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
// Now fake the region closing successfully over on the regionserver; the
// regionserver will have set the region in CLOSED state. This will
// trigger callback into AM. The below zk close call is from the RS close
@ -250,7 +260,7 @@ public class TestAssignmentManager {
assertNotSame(-1, versionid);
// This uglyness below is what the openregionhandler on RS side does.
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, versionid);
assertNotSame(-1, versionid);
// Move znode from OPENING to OPENED as RS does on successful open.
@ -275,6 +285,11 @@ public class TestAssignmentManager {
createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
startFakeFailedOverMasterAssignmentManager(am, this.watcher);
while (!am.processRITInvoked) Thread.sleep(1);
// As part of the failover cleanup, the balancing region plan is removed.
// So a random server will be used to open the region. For testing purpose,
// let's assume it is going to open on server b:
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
// Now fake the region closing successfully over on the regionserver; the
// regionserver will have set the region in CLOSED state. This will
// trigger callback into AM. The below zk close call is from the RS close
@ -292,7 +307,7 @@ public class TestAssignmentManager {
assertNotSame(-1, versionid);
// This uglyness below is what the openregionhandler on RS side does.
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, versionid);
assertNotSame(-1, versionid);
// Move znode from OPENING to OPENED as RS does on successful open.
@ -798,12 +813,11 @@ public class TestAssignmentManager {
EventType.RS_ZK_REGION_OPENING, version);
RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_OPENING,
REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY);
Map<ServerName, List<HRegionInfo>> deadServers =
new HashMap<ServerName, List<HRegionInfo>>();
deadServers.put(SERVERNAME_A, null);
version = ZKAssign.getVersion(this.watcher, REGIONINFO);
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(false);
am.getRegionStates().createRegionState(REGIONINFO);
am.gate.set(false);
am.processRegionsInTransition(rt, REGIONINFO, deadServers, version);
am.processRegionsInTransition(rt, REGIONINFO, version);
// Waiting for the assignment to get completed.
while (!am.gate.get()) {
Thread.sleep(10);
@ -1017,22 +1031,18 @@ public class TestAssignmentManager {
@Override
boolean processRegionInTransition(String encodedRegionName,
HRegionInfo regionInfo,
Map<ServerName, List<HRegionInfo>> deadServers)
throws KeeperException, IOException {
HRegionInfo regionInfo) throws KeeperException, IOException {
this.processRITInvoked = true;
return super.processRegionInTransition(encodedRegionName, regionInfo,
deadServers);
return super.processRegionInTransition(encodedRegionName, regionInfo);
}
@Override
public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan,
boolean hijack) {
public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan) {
if (enabling) {
assignmentCount++;
this.regionOnline(region, SERVERNAME_A);
} else {
super.assign(region, setOfflineInZK, forceNewPlan, hijack);
super.assign(region, setOfflineInZK, forceNewPlan);
this.gate.set(true);
}
}
@ -1097,5 +1107,4 @@ public class TestAssignmentManager {
t.start();
while (!t.isAlive()) Threads.sleep(1);
}
}

View File

@ -210,6 +210,7 @@ public class TestCloseRegionHandler {
// Create it OFFLINE node, which is what Master set before sending OPEN RPC
ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName());
OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd);
rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE);
openHandler.process();
// This parse is not used?
RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()));

View File

@ -114,9 +114,13 @@ public class TestOpenRegionHandler {
return region;
}
};
rss.getRegionsInTransitionInRS().put(
hri.getEncodedNameAsBytes(), Boolean.TRUE);
// Call process without first creating OFFLINE region in zk, see if
// exception or just quiet return (expected).
handler.process();
rss.getRegionsInTransitionInRS().put(
hri.getEncodedNameAsBytes(), Boolean.TRUE);
ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName());
// Call process again but this time yank the zk znode out from under it
// post OPENING; again will expect it to come back w/o NPE or exception.
@ -143,6 +147,8 @@ public class TestOpenRegionHandler {
return null;
}
};
rsServices.getRegionsInTransitionInRS().put(
TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE);
handler.process();
// Handler should have transitioned it to FAILED_OPEN
@ -168,6 +174,8 @@ public class TestOpenRegionHandler {
return false;
}
};
rsServices.getRegionsInTransitionInRS().put(
TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE);
handler.process();
// Handler should have transitioned it to FAILED_OPEN