HBASE-6272 In-memory region state is inconsistent

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1366438 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-07-27 16:34:32 +00:00
parent b358b89351
commit 57afee6a6b
41 changed files with 1451 additions and 1202 deletions

View File

@ -20,7 +20,7 @@ limitations under the License.
<%import>
org.apache.hadoop.hbase.HRegionInfo;
org.apache.hadoop.hbase.master.AssignmentManager;
org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
org.apache.hadoop.hbase.master.RegionState;
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.hbase.HBaseConfiguration;
org.apache.hadoop.hbase.HConstants;
@ -32,7 +32,8 @@ AssignmentManager assignmentManager;
int limit = 100;
</%args>
<%java>
Map<String, RegionState> rit = assignmentManager.copyRegionsInTransition();
Map<String, RegionState> rit = assignmentManager
.getRegionStates().getRegionsInTransition();
// process the map to find region in transition details
Configuration conf = HBaseConfiguration.create();
int ritThreshold = conf.getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);

View File

@ -20,19 +20,16 @@
package org.apache.hadoop.hbase;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.HashSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
@ -41,13 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileConte
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
import com.google.protobuf.ByteString;
@ -83,7 +74,6 @@ public class ClusterStatus extends VersionedWritable {
* <dt>3</dt> <dd>Added master and backupMasters</dd>
* </dl>
*/
private static final byte VERSION_MASTER_BACKUPMASTERS = 2;
private static final byte VERSION = 2;
private String hbaseVersion;

View File

@ -19,8 +19,6 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -29,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class UnknownRegionException extends IOException {
public class UnknownRegionException extends RegionException {
private static final long serialVersionUID = 1968858760475205392L;
public UnknownRegionException(String regionName) {

View File

@ -1177,6 +1177,16 @@ public class HBaseAdmin implements Abortable, Closeable {
ProtobufUtil.closeRegion(admin, hri.getRegionName(), false);
}
/**
* Get all the online regions on a region server.
*/
public List<HRegionInfo> getOnlineRegions(
final ServerName sn) throws IOException {
AdminProtocol admin =
this.connection.getAdmin(sn.getHostname(), sn.getPort());
return ProtobufUtil.getOnlineRegions(admin);
}
/**
* Flush a table or an individual region.
* Asynchronous operation.

View File

@ -62,8 +62,7 @@ public class BulkReOpen extends BulkAssigner {
// add plans for the regions that need to be reopened
Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>();
for (HRegionInfo hri : hris) {
RegionPlan reOpenPlan = new RegionPlan(hri, null,
assignmentManager.getRegionServerOfRegion(hri));
RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri);
plans.put(hri.getEncodedName(), reOpenPlan);
}
assignmentManager.addPlans(plans);

View File

@ -29,7 +29,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -55,10 +54,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableNotDisabledException;
@ -77,12 +80,6 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@ -102,40 +99,17 @@ import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
@ -152,10 +126,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableR
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
@ -164,26 +134,53 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRe
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
@ -205,6 +202,7 @@ import com.google.protobuf.ServiceException;
* @see Watcher
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class HMaster extends HasThread
implements MasterMonitorProtocol, MasterAdminProtocol, RegionServerStatusProtocol, MasterServices,
Server {
@ -505,7 +503,7 @@ Server {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.assignmentManager = new AssignmentManager(this, serverManager,
this.catalogTracker, this.balancer, this.executorService, this.metrics);
this.catalogTracker, this.balancer, this.executorService, this.metrics);
zooKeeper.registerListenerFirst(assignmentManager);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
@ -649,7 +647,7 @@ Server {
}
}
if (!masterRecovery) {
if (!masterRecovery) {
this.assignmentManager.startTimeOutMonitor();
}
// TODO: Should do this in background rather than block master startup
@ -768,6 +766,8 @@ Server {
// Work on ROOT region. Is it in zk in transition?
status.setStatus("Assigning ROOT region");
assignmentManager.getRegionStates().createRegionState(
HRegionInfo.ROOT_REGIONINFO);
boolean rit = this.assignmentManager.
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
ServerName currentRootServer = null;
@ -802,6 +802,8 @@ Server {
// Work on meta region
status.setStatus("Assigning META region");
assignmentManager.getRegionStates().createRegionState(
HRegionInfo.FIRST_META_REGIONINFO);
rit = this.assignmentManager.
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
@ -1228,12 +1230,12 @@ Server {
boolean balancerRan;
synchronized (this.balancer) {
// Only allow one balance run at at time.
if (this.assignmentManager.isRegionsInTransition()) {
LOG.debug("Not running balancer because " +
this.assignmentManager.getRegionsInTransition().size() +
" region(s) in transition: " +
org.apache.commons.lang.StringUtils.
abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
Map<String, RegionState> regionsInTransition =
this.assignmentManager.getRegionStates().getRegionsInTransition();
LOG.debug("Not running balancer because " + regionsInTransition.size() +
" region(s) in transition: " + org.apache.commons.lang.StringUtils.
abbreviate(regionsInTransition.toString(), 256));
return false;
}
if (this.serverManager.areDeadServersInProgress()) {
@ -1255,7 +1257,7 @@ Server {
}
Map<String, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
this.assignmentManager.getAssignmentsByTable();
this.assignmentManager.getRegionStates().getAssignmentsByTable();
List<RegionPlan> plans = new ArrayList<RegionPlan>();
//Give the balancer the current cluster state.
@ -1373,23 +1375,24 @@ Server {
LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
+ " actual: " + type);
}
Pair<HRegionInfo, ServerName> p =
this.assignmentManager.getAssignment(encodedRegionName);
if (p == null) {
throw new ServiceException(
RegionState regionState = assignmentManager.getRegionStates().
getRegionState(Bytes.toString(encodedRegionName));
if (regionState == null) {
throw new ServiceException(
new UnknownRegionException(Bytes.toStringBinary(encodedRegionName)));
}
HRegionInfo hri = p.getFirst();
HRegionInfo hri = regionState.getRegion();
ServerName dest;
if (destServerName == null || destServerName.length == 0) {
LOG.info("Passed destination servername is null/empty so " +
"choosing a server at random");
final List<ServerName> destServers = this.serverManager.createDestinationServersList(
p.getSecond());
regionState.getServerName());
dest = balancer.randomAssignment(hri, destServers);
} else {
dest = new ServerName(Bytes.toString(destServerName));
if (dest.equals(p.getSecond())) {
if (dest.equals(regionState.getServerName())) {
LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
+ " because region already assigned to the same server " + dest + ".");
return mrr;
@ -1397,7 +1400,7 @@ Server {
}
// Now we can do the move
RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
try {
if (this.cpHost != null) {
@ -1729,12 +1732,6 @@ Server {
}
}
public void clearFromTransition(HRegionInfo hri) {
if (this.assignmentManager.isRegionInTransition(hri) != null) {
this.assignmentManager.regionOffline(hri);
}
}
@Override
public GetClusterStatusResponse getClusterStatus(RpcController controller, GetClusterStatusRequest req)
throws ServiceException {
@ -1787,7 +1784,7 @@ Server {
this.serverManager.getDeadServers(),
this.serverName,
backupMasters,
this.assignmentManager.copyRegionsInTransition(),
this.assignmentManager.getRegionStates().getRegionsInTransition(),
this.getCoprocessors(), this.balanceSwitch);
}
@ -1952,12 +1949,11 @@ Server {
public AssignmentManager getAssignmentManager() {
return this.assignmentManager;
}
public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
return rsFatals;
}
@SuppressWarnings("deprecation")
public void shutdown() {
if (cpHost != null) {
try {
@ -2082,17 +2078,16 @@ Server {
LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
+ " actual: " + type);
}
Pair<HRegionInfo, ServerName> pair =
MetaReader.getRegion(this.catalogTracker, regionName);
if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
if (cpHost != null) {
if (cpHost.preAssign(pair.getFirst())) {
if (cpHost.preAssign(regionInfo)) {
return arr;
}
}
assignRegion(pair.getFirst());
assignRegion(regionInfo);
if (cpHost != null) {
cpHost.postAssign(pair.getFirst());
cpHost.postAssign(regionInfo);
}
return arr;
@ -2191,7 +2186,7 @@ Server {
* @return the average load
*/
public double getAverageLoad() {
return this.assignmentManager.getAverageLoad();
return this.assignmentManager.getRegionStates().getAverageLoad();
}
/**
@ -2255,7 +2250,6 @@ Server {
/**
* Register bean with platform management server
*/
@SuppressWarnings("deprecation")
void registerMBean() {
MXBeanImpl mxBeanInfo = MXBeanImpl.init(this);
MBeanUtil.registerMBean("Master", "Master", mxBeanInfo);

View File

@ -25,7 +25,6 @@ import java.util.Map.Entry;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
/**
* Impl for exposing HMaster Information through JMX
@ -99,8 +98,8 @@ public class MXBeanImpl implements MXBean {
public RegionsInTransitionInfo[] getRegionsInTransition() {
List<RegionsInTransitionInfo> info =
new ArrayList<RegionsInTransitionInfo>();
for (final Entry<String, RegionState> entry :
master.getAssignmentManager().copyRegionsInTransition().entrySet()) {
for (final Entry<String, RegionState> entry : master.getAssignmentManager()
.getRegionStates().getRegionsInTransition().entrySet()) {
RegionsInTransitionInfo innerinfo = new RegionsInTransitionInfo() {
@Override

View File

@ -24,17 +24,14 @@ import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Date;
import java.util.Map;
import java.util.NavigableMap;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.monitoring.LogMonitoring;
import org.apache.hadoop.hbase.monitoring.StateDumpServlet;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -104,8 +101,8 @@ public class MasterDumpServlet extends StateDumpServlet {
private void dumpRIT(HMaster master, PrintWriter out) {
NavigableMap<String, RegionState> regionsInTransition =
master.getAssignmentManager().copyRegionsInTransition();
Map<String, RegionState> regionsInTransition =
master.getAssignmentManager().getRegionStates().getRegionsInTransition();
for (Map.Entry<String, RegionState> e : regionsInTransition.entrySet()) {
String rid = e.getKey();
RegionState rs = e.getValue();

View File

@ -25,9 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
/**
* Services Master supplies

View File

@ -1,123 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* <p>Encapsulate a ConcurrentSkipListMap to ensure that notifications are sent when
* the list is modified. Offers only the functions used by the AssignementManager, hence
* does not extends ConcurrentSkipListMap.</p>
*
* <p>Used only in master package (main & test), so it's package protected.</p>
*
* @param <K> - class for the keys
* @param <V> - class for the values
*/
class NotifiableConcurrentSkipListMap<K, V> {
private final ConcurrentSkipListMap<K, V> delegatee = new ConcurrentSkipListMap<K, V>();
public boolean isEmpty() {
return delegatee.isEmpty();
}
public int size() {
return delegatee.size();
}
public void put(K k, V v) {
synchronized (delegatee) {
delegatee.put(k, v);
delegatee.notifyAll();
}
}
public V remove(K k) {
synchronized (delegatee) {
V v = delegatee.remove(k);
if (v != null) {
delegatee.notifyAll();
}
return v;
}
}
public void waitForUpdate(long timeout) throws InterruptedException {
synchronized (delegatee){
delegatee.wait(timeout);
}
}
public boolean containsKey(K k) {
return delegatee.containsKey(k);
}
public Collection<?> keySet() {
return delegatee.keySet();
}
public V get(K k) {
return delegatee.get(k);
}
public NavigableMap<K, V> copyMap() {
return delegatee.clone();
}
public Collection<V> copyValues() {
Collection<V> values = new ArrayList<V>(size());
synchronized (delegatee) {
values.addAll(delegatee.values());
}
return values;
}
public Set<Map.Entry<K, V>> copyEntrySet() {
Set<Map.Entry<K, V>> entrySet = new TreeSet<Map.Entry<K, V>>();
synchronized (delegatee) {
Iterator<Map.Entry<K, V>> it = delegatee.entrySet().iterator();
while (it.hasNext()) {
entrySet.add(it.next());
}
}
return entrySet;
}
public void waitForUpdate() throws InterruptedException {
synchronized (delegatee) {
delegatee.wait();
}
}
public void clear() {
if (!delegatee.isEmpty()) {
synchronized (delegatee) {
delegatee.clear();
delegatee.notifyAll();
}
}
}
}

View File

@ -0,0 +1,259 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
/**
* State of a Region while undergoing transitions.
* Region state cannot be modified except the stamp field.
* So it is almost immutable.
*/
@InterfaceAudience.Private
public class RegionState implements org.apache.hadoop.io.Writable {
public enum State {
OFFLINE, // region is in an offline state
PENDING_OPEN, // sent rpc to server to open but has not begun
OPENING, // server has begun to open but not yet done
OPEN, // server opened region and updated meta
PENDING_CLOSE, // sent rpc to server to close but has not begun
CLOSING, // server has begun to close but not yet done
CLOSED, // server closed region and updated meta
SPLITTING, // server started split of a region
SPLIT // server completed split of a region
}
// Many threads can update the state at the stamp at the same time
private final AtomicLong stamp;
private HRegionInfo region;
private volatile ServerName serverName;
private volatile State state;
public RegionState() {
this.stamp = new AtomicLong(System.currentTimeMillis());
}
public RegionState(HRegionInfo region, State state) {
this(region, state, System.currentTimeMillis(), null);
}
public RegionState(HRegionInfo region,
State state, long stamp, ServerName serverName) {
this.region = region;
this.state = state;
this.stamp = new AtomicLong(stamp);
this.serverName = serverName;
}
public void updateTimestampToNow() {
this.stamp.set(System.currentTimeMillis());
}
public State getState() {
return state;
}
public long getStamp() {
return stamp.get();
}
public HRegionInfo getRegion() {
return region;
}
public ServerName getServerName() {
return serverName;
}
public boolean isClosing() {
return state == State.CLOSING;
}
public boolean isClosed() {
return state == State.CLOSED;
}
public boolean isPendingClose() {
return state == State.PENDING_CLOSE;
}
public boolean isOpening() {
return state == State.OPENING;
}
public boolean isOpened() {
return state == State.OPEN;
}
public boolean isPendingOpen() {
return state == State.PENDING_OPEN;
}
public boolean isOffline() {
return state == State.OFFLINE;
}
public boolean isSplitting() {
return state == State.SPLITTING;
}
public boolean isSplit() {
return state == State.SPLIT;
}
@Override
public String toString() {
return "{" + region.getRegionNameAsString()
+ " state=" + state
+ ", ts=" + stamp
+ ", server=" + serverName + "}";
}
/**
* A slower (but more easy-to-read) stringification
*/
public String toDescriptiveString() {
long lstamp = stamp.get();
long relTime = System.currentTimeMillis() - lstamp;
return region.getRegionNameAsString()
+ " state=" + state
+ ", ts=" + new Date(lstamp) + " (" + (relTime/1000) + "s ago)"
+ ", server=" + serverName;
}
/**
* Convert a RegionState to an HBaseProtos.RegionState
*
* @return the converted HBaseProtos.RegionState
*/
public ClusterStatusProtos.RegionState convert() {
ClusterStatusProtos.RegionState.Builder regionState = ClusterStatusProtos.RegionState.newBuilder();
ClusterStatusProtos.RegionState.State rs;
switch (regionState.getState()) {
case OFFLINE:
rs = ClusterStatusProtos.RegionState.State.OFFLINE;
break;
case PENDING_OPEN:
rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN;
break;
case OPENING:
rs = ClusterStatusProtos.RegionState.State.OPENING;
break;
case OPEN:
rs = ClusterStatusProtos.RegionState.State.OPEN;
break;
case PENDING_CLOSE:
rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE;
break;
case CLOSING:
rs = ClusterStatusProtos.RegionState.State.CLOSING;
break;
case CLOSED:
rs = ClusterStatusProtos.RegionState.State.CLOSED;
break;
case SPLITTING:
rs = ClusterStatusProtos.RegionState.State.SPLITTING;
break;
case SPLIT:
rs = ClusterStatusProtos.RegionState.State.SPLIT;
break;
default:
throw new IllegalStateException("");
}
regionState.setRegionInfo(HRegionInfo.convert(region));
regionState.setState(rs);
regionState.setStamp(getStamp());
return regionState.build();
}
/**
* Convert a protobuf HBaseProtos.RegionState to a RegionState
*
* @return the RegionState
*/
public static RegionState convert(ClusterStatusProtos.RegionState proto) {
RegionState.State state;
switch (proto.getState()) {
case OFFLINE:
state = State.OFFLINE;
break;
case PENDING_OPEN:
state = State.PENDING_OPEN;
break;
case OPENING:
state = State.OPENING;
break;
case OPEN:
state = State.OPEN;
break;
case PENDING_CLOSE:
state = State.PENDING_CLOSE;
break;
case CLOSING:
state = State.CLOSING;
break;
case CLOSED:
state = State.CLOSED;
break;
case SPLITTING:
state = State.SPLITTING;
break;
case SPLIT:
state = State.SPLIT;
break;
default:
throw new IllegalStateException("");
}
return new RegionState(HRegionInfo.convert(proto.getRegionInfo()),state,proto.getStamp(),null);
}
/**
* @deprecated Writables are going away
*/
@Deprecated
@Override
public void readFields(DataInput in) throws IOException {
region = new HRegionInfo();
region.readFields(in);
state = State.valueOf(in.readUTF());
stamp.set(in.readLong());
}
/**
* @deprecated Writables are going away
*/
@Deprecated
@Override
public void write(DataOutput out) throws IOException {
region.write(out);
out.writeUTF(state.name());
out.writeLong(stamp.get());
}
}

View File

@ -0,0 +1,529 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
* Region state accountant. It holds the states of all regions in the memory.
* In normal scenario, it should match the meta table and the true region states.
*
* This map is used by AssignmentManager to track region states.
*/
@InterfaceAudience.Private
public class RegionStates {
private static final Log LOG = LogFactory.getLog(RegionStates.class);
/**
* Regions currently in transition.
*/
final HashMap<String, RegionState> regionsInTransition;
/**
* Region encoded name to state map.
* All the regions should be in this map.
*/
private final Map<String, RegionState> regionStates;
/**
* Server to regions assignment map.
* Contains the set of regions currently assigned to a given server.
*/
private final Map<ServerName, Set<HRegionInfo>> serverHoldings;
/**
* Region to server assignment map.
* Contains the server a given region is currently assigned to.
*/
private final TreeMap<HRegionInfo, ServerName> regionAssignments;
private final ServerManager serverManager;
private final Server server;
RegionStates(final Server master, final ServerManager serverManager) {
regionStates = new HashMap<String, RegionState>();
regionsInTransition = new HashMap<String, RegionState>();
serverHoldings = new HashMap<ServerName, Set<HRegionInfo>>();
regionAssignments = new TreeMap<HRegionInfo, ServerName>();
this.serverManager = serverManager;
this.server = master;
}
/**
* @return an unmodifiable the region assignment map
*/
@SuppressWarnings("unchecked")
public synchronized Map<HRegionInfo, ServerName> getRegionAssignments() {
return (Map<HRegionInfo, ServerName>)regionAssignments.clone();
}
public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) {
return regionAssignments.get(hri);
}
/**
* Get regions in transition and their states
*/
@SuppressWarnings("unchecked")
public synchronized Map<String, RegionState> getRegionsInTransition() {
return (Map<String, RegionState>)regionsInTransition.clone();
}
/**
* @return True if specified region in transition.
*/
public synchronized boolean isRegionInTransition(final HRegionInfo hri) {
return regionsInTransition.containsKey(hri.getEncodedName());
}
/**
* @return True if specified region in transition.
*/
public synchronized boolean isRegionInTransition(final String regionName) {
return regionsInTransition.containsKey(regionName);
}
/**
* @return True if any region in transition.
*/
public synchronized boolean isRegionsInTransition() {
return !regionsInTransition.isEmpty();
}
/**
* @return True if specified region assigned.
*/
public synchronized boolean isRegionAssigned(final HRegionInfo hri) {
return regionAssignments.containsKey(hri);
}
/**
* Wait for the state map to be updated by assignment manager.
*/
public synchronized void waitForUpdate(
final long timeout) throws InterruptedException {
this.wait(timeout);
}
/**
* Get region transition state
*/
public synchronized RegionState
getRegionTransitionState(final HRegionInfo hri) {
return regionsInTransition.get(hri.getEncodedName());
}
/**
* Get region transition state
*/
public synchronized RegionState
getRegionTransitionState(final String regionName) {
return regionsInTransition.get(regionName);
}
/**
* Add a list of regions to RegionStates. The initial state is OFFLINE.
* If any region is already in RegionStates, that region will be skipped.
*/
public synchronized void createRegionStates(
final List<HRegionInfo> hris) {
for (HRegionInfo hri: hris) {
createRegionState(hri);
}
}
/**
* Add a region to RegionStates. The initial state is OFFLINE.
* If it is already in RegionStates, this call has no effect,
* and the original state is returned.
*/
public synchronized RegionState createRegionState(final HRegionInfo hri) {
String regionName = hri.getEncodedName();
RegionState regionState = regionStates.get(regionName);
if (regionState != null) {
LOG.warn("Tried to create a state of a region already in RegionStates "
+ hri + ", used existing state: " + regionState
+ ", ignored new state: state=OFFLINE, server=null");
} else {
regionState = new RegionState(hri, State.OFFLINE);
regionStates.put(regionName, regionState);
}
return regionState;
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state) {
RegionState regionState = regionStates.get(hri.getEncodedName());
ServerName serverName = (regionState == null || state == State.CLOSED
|| state == State.OFFLINE) ? null : regionState.getServerName();
return updateRegionState(hri, state, serverName);
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(
final HRegionInfo hri, final State state, final ServerName serverName) {
return updateRegionState(hri, state, System.currentTimeMillis(), serverName);
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
*
* If we can't find the region info based on the region name in
* the transition, log a warning and return null.
*/
public synchronized RegionState updateRegionState(
final RegionTransition transition, final State state) {
byte[] regionName = transition.getRegionName();
HRegionInfo regionInfo = getRegionInfo(regionName);
if (regionInfo == null) {
String prettyRegionName = HRegionInfo.prettyPrint(
HRegionInfo.encodeRegionName(regionName));
LOG.warn("Failed to find region " + prettyRegionName
+ " in updating its state to " + state
+ " based on region transition " + transition);
return null;
}
return updateRegionState(regionInfo, state,
transition.getCreateTime(), transition.getServerName());
}
/**
* Update a region state. If it is not splitting,
* it will be put in transition if not already there.
*/
public synchronized RegionState updateRegionState(final HRegionInfo hri,
final State state, final long stamp, final ServerName serverName) {
ServerName newServerName = serverName;
if (serverName != null &&
(state == State.CLOSED || state == State.OFFLINE)) {
LOG.warn("Closed region " + hri + " still on "
+ serverName + "? Ignored, reset it to null");
newServerName = null;
}
String regionName = hri.getEncodedName();
RegionState regionState = new RegionState(hri, state, stamp, newServerName);
RegionState oldState = regionStates.put(regionName, regionState);
LOG.info("Region " + hri + " transitioned from " + oldState + " to " + regionState);
if (state != State.SPLITTING && (newServerName != null
|| (state != State.PENDING_CLOSE && state != State.CLOSING))) {
regionsInTransition.put(regionName, regionState);
}
// notify the change
this.notifyAll();
return regionState;
}
/**
* A region is online, won't be in transition any more.
* We can't confirm it is really online on specified region server
* because it hasn't been put in region server's online region list yet.
*/
public synchronized void regionOnline(
final HRegionInfo hri, final ServerName serverName) {
String regionName = hri.getEncodedName();
RegionState oldState = regionStates.get(regionName);
if (oldState == null) {
LOG.warn("Online a region not in RegionStates: " + hri);
} else {
State state = oldState.getState();
ServerName sn = oldState.getServerName();
if (state != State.OPEN || sn == null || !sn.equals(serverName)) {
LOG.debug("Online a region with current state=" + state + ", expected state=OPEN"
+ ", assigned to server: " + sn + " expected " + serverName);
}
}
updateRegionState(hri, State.OPEN, serverName);
regionsInTransition.remove(regionName);
ServerName oldServerName = regionAssignments.put(hri, serverName);
if (!serverName.equals(oldServerName)) {
LOG.info("Onlined region " + hri + " on " + serverName);
Set<HRegionInfo> regions = serverHoldings.get(serverName);
if (regions == null) {
regions = new HashSet<HRegionInfo>();
serverHoldings.put(serverName, regions);
}
regions.add(hri);
if (oldServerName != null) {
LOG.info("Offlined region " + hri + " from " + oldServerName);
serverHoldings.get(oldServerName).remove(hri);
}
}
}
/**
* A region is offline, won't be in transition any more.
*/
public synchronized void regionOffline(final HRegionInfo hri) {
String regionName = hri.getEncodedName();
RegionState oldState = regionStates.get(regionName);
if (oldState == null) {
LOG.warn("Offline a region not in RegionStates: " + hri);
} else {
State state = oldState.getState();
ServerName sn = oldState.getServerName();
if (state != State.OFFLINE || sn != null) {
LOG.debug("Online a region with current state=" + state + ", expected state=OFFLINE"
+ ", assigned to server: " + sn + ", expected null");
}
}
updateRegionState(hri, State.OFFLINE);
regionsInTransition.remove(regionName);
ServerName oldServerName = regionAssignments.remove(hri);
if (oldServerName != null) {
LOG.info("Offlined region " + hri + " from " + oldServerName);
serverHoldings.get(oldServerName).remove(hri);
}
}
/**
* A server is offline, all regions on it are dead.
*/
public synchronized List<RegionState> serverOffline(final ServerName sn) {
// Clean up this server from map of servers to regions, and remove all regions
// of this server from online map of regions.
List<RegionState> rits = new ArrayList<RegionState>();
Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
if (assignedRegions == null || assignedRegions.isEmpty()) {
// No regions on this server, we are done, return empty list of RITs
return rits;
}
for (HRegionInfo region : assignedRegions) {
regionAssignments.remove(region);
}
// See if any of the regions that were online on this server were in RIT
// If they are, normal timeouts will deal with them appropriately so
// let's skip a manual re-assignment.
for (RegionState state : regionsInTransition.values()) {
if (assignedRegions.contains(state.getRegion())) {
rits.add(state);
}
}
assignedRegions.clear();
this.notifyAll();
return rits;
}
/**
* Gets the online regions of the specified table.
* This method looks at the in-memory state. It does not go to <code>.META.</code>.
* Only returns <em>online</em> regions. If a region on this table has been
* closed during a disable, etc., it will be included in the returned list.
* So, the returned list may not necessarily be ALL regions in this table, its
* all the ONLINE regions in the table.
* @param tableName
* @return Online regions from <code>tableName</code>
*/
public synchronized List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
// boundary needs to have table's name but regionID 0 so that it is sorted
// before all table's regions.
HRegionInfo boundary = new HRegionInfo(tableName, null, null, false, 0L);
for (HRegionInfo hri: regionAssignments.tailMap(boundary).keySet()) {
if(Bytes.equals(hri.getTableName(), tableName)) {
tableRegions.add(hri);
} else {
break;
}
}
return tableRegions;
}
/**
* Wait on region to clear regions-in-transition.
* <p>
* If the region isn't in transition, returns immediately. Otherwise, method
* blocks until the region is out of transition.
*/
public synchronized void waitOnRegionToClearRegionsInTransition(
final HRegionInfo hri) throws InterruptedException {
if (!isRegionInTransition(hri)) return;
while(!server.isStopped() && isRegionInTransition(hri)) {
RegionState rs = getRegionState(hri);
LOG.info("Waiting on " + rs + " to clear regions-in-transition");
waitForUpdate(100);
}
if (server.isStopped()) {
LOG.info("Giving up wait on region in " +
"transition because stoppable.isStopped is set");
}
}
/**
* Waits until the specified region has completed assignment.
* <p>
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
*/
public synchronized void waitForAssignment(
final HRegionInfo hri) throws InterruptedException {
if (!isRegionAssigned(hri)) return;
while(!server.isStopped() && !isRegionAssigned(hri)) {
RegionState rs = getRegionState(hri);
LOG.info("Waiting on " + rs + " to be assigned");
waitForUpdate(100);
}
if (server.isStopped()) {
LOG.info("Giving up wait on region " +
"assignment because stoppable.isStopped is set");
}
}
/**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
* regions being served, ignoring stats about number of requests.
* @return the average load
*/
protected synchronized double getAverageLoad() {
int numServers = 0, totalLoad = 0;
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
Set<HRegionInfo> regions = e.getValue();
ServerName serverName = e.getKey();
int regionCount = regions.size();
if (regionCount > 0 || serverManager.isServerOnline(serverName)) {
totalLoad += regionCount;
numServers++;
}
}
return numServers == 0 ? 0.0 :
(double)totalLoad / (double)numServers;
}
/**
* This is an EXPENSIVE clone. Cloning though is the safest thing to do.
* Can't let out original since it can change and at least the load balancer
* wants to iterate this exported list. We need to synchronize on regions
* since all access to this.servers is under a lock on this.regions.
*
* @return A clone of current assignments by table.
*/
protected Map<String, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
Map<String, Map<ServerName, List<HRegionInfo>>> result =
new HashMap<String, Map<ServerName,List<HRegionInfo>>>();
synchronized (this) {
if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", true)) {
Map<ServerName, List<HRegionInfo>> svrToRegions =
new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
}
result.put("ensemble", svrToRegions);
} else {
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
for (HRegionInfo hri: e.getValue()) {
if (hri.isMetaRegion() || hri.isRootRegion()) continue;
String tablename = hri.getTableNameAsString();
Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
if (svrToRegions == null) {
svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
result.put(tablename, svrToRegions);
}
List<HRegionInfo> regions = svrToRegions.get(e.getKey());
if (regions == null) {
regions = new ArrayList<HRegionInfo>();
svrToRegions.put(e.getKey(), regions);
}
regions.add(hri);
}
}
}
}
Map<ServerName, ServerLoad>
onlineSvrs = serverManager.getOnlineServers();
// Take care of servers w/o assignments.
for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
for (ServerName svr: onlineSvrs.keySet()) {
if (!map.containsKey(svr)) {
map.put(svr, new ArrayList<HRegionInfo>());
}
}
}
return result;
}
protected synchronized RegionState getRegionState(final HRegionInfo hri) {
return regionStates.get(hri.getEncodedName());
}
protected synchronized RegionState getRegionState(final String regionName) {
return regionStates.get(regionName);
}
/**
* Get the HRegionInfo from cache, if not there, from the META table
* @param regionName
* @return HRegionInfo for the region
*/
protected HRegionInfo getRegionInfo(final byte [] regionName) {
String encodedName = HRegionInfo.encodeRegionName(regionName);
RegionState regionState = regionStates.get(encodedName);
if (regionState != null) {
return regionState.getRegion();
}
try {
Pair<HRegionInfo, ServerName> p =
MetaReader.getRegion(server.getCatalogTracker(), regionName);
return p == null ? null : p.getFirst();
} catch (IOException e) {
server.abort("Aborting because error occoured while reading " +
Bytes.toStringBinary(regionName) + " from .META.", e);
return null;
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.RegionState;
/**
* Handles CLOSED region event on Master.
@ -99,9 +100,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
return;
}
// ZK Node is in CLOSED state, assign it.
// TODO: Should we remove the region from RIT too? We don't? Makes for
// a 'forcing' log message when we go to update state from CLOSED to OFFLINE
assignmentManager.setOffline(regionInfo);
assignmentManager.getRegionStates().updateRegionState(
regionInfo, RegionState.State.CLOSED, null);
// This below has to do w/ online enable/disable of a table
assignmentManager.removeClosedRegion(regionInfo);
assignmentManager.assign(regionInfo, true);

View File

@ -176,8 +176,9 @@ public class CreateTableHandler extends EventHandler {
// 4. Trigger immediate assignment of the regions in round-robin fashion
List<ServerName> servers = serverManager.createDestinationServersList();
try {
this.assignmentManager.assignUserRegions(Arrays.asList(newRegions),
servers);
List<HRegionInfo> regions = Arrays.asList(newRegions);
assignmentManager.getRegionStates().createRegionStates(regions);
assignmentManager.assignUserRegions(regions, servers);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw new IOException(ie);

View File

@ -62,12 +62,12 @@ public class DeleteTableHandler extends TableEventHandler {
for (HRegionInfo region : regions) {
long done = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < done) {
AssignmentManager.RegionState rs = am.isRegionInTransition(region);
if (rs == null) break;
if (!am.getRegionStates().isRegionInTransition(region)) break;
Threads.sleep(waitingTimeForEvents);
LOG.debug("Waiting on region to clear regions in transition; " + rs);
LOG.debug("Waiting on region to clear regions in transition; "
+ am.getRegionStates().getRegionTransitionState(region));
}
if (am.isRegionInTransition(region) != null) {
if (am.getRegionStates().isRegionInTransition(region)) {
throw new IOException("Waited hbase.master.wait.on.region (" +
waitTime + "ms) for region to leave region " +
region.getRegionNameAsString() + " in transitions");

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
@ -124,8 +125,8 @@ public class DisableTableHandler extends EventHandler {
// already closed will not be included in this list; i.e. the returned
// list is not ALL regions in a table, its all online regions according
// to the in-memory state on this master.
final List<HRegionInfo> regions =
this.assignmentManager.getRegionsOfTable(tableName);
final List<HRegionInfo> regions = this.assignmentManager
.getRegionStates().getRegionsOfTable(tableName);
if (regions.size() == 0) {
done = true;
break;
@ -162,8 +163,9 @@ public class DisableTableHandler extends EventHandler {
@Override
protected void populatePool(ExecutorService pool) {
RegionStates regionStates = assignmentManager.getRegionStates();
for (HRegionInfo region: regions) {
if (assignmentManager.isRegionInTransition(region) != null) continue;
if (regionStates.isRegionInTransition(region)) continue;
final HRegionInfo hri = region;
pool.execute(new Runnable() {
public void run() {
@ -181,7 +183,7 @@ public class DisableTableHandler extends EventHandler {
List<HRegionInfo> regions = null;
while (!server.isStopped() && remaining > 0) {
Thread.sleep(waitingTimeForEvents);
regions = assignmentManager.getRegionsOfTable(tableName);
regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName);
if (regions.isEmpty()) break;
remaining = timeout - (System.currentTimeMillis() - startTime);
}

View File

@ -167,7 +167,8 @@ public class EnableTableHandler extends EventHandler {
final List<HRegionInfo> regionsInMeta)
throws IOException {
final List<HRegionInfo> onlineRegions =
this.assignmentManager.getRegionsOfTable(tableName);
this.assignmentManager.getRegionStates()
.getRegionsOfTable(tableName);
regionsInMeta.removeAll(onlineRegions);
return regionsInMeta;
}
@ -194,7 +195,8 @@ public class EnableTableHandler extends EventHandler {
if (!roundRobinAssignment) {
for (HRegionInfo region : regions) {
if (assignmentManager.isRegionInTransition(region) != null) {
if (assignmentManager.getRegionStates()
.isRegionInTransition(region)) {
continue;
}
final HRegionInfo hri = region;
@ -223,7 +225,8 @@ public class EnableTableHandler extends EventHandler {
int lastNumberOfRegions = 0;
while (!server.isStopped() && remaining > 0) {
Thread.sleep(waitingTimeForEvents);
regions = assignmentManager.getRegionsOfTable(tableName);
regions = assignmentManager.getRegionStates()
.getRegionsOfTable(tableName);
if (isDone(regions)) break;
// Punt on the timeout as long we make progress

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -98,7 +98,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
public void process() {
// Code to defend against case where we get SPLIT before region open
// processing completes; temporary till we make SPLITs go via zk -- 0.92.
RegionState regionState = this.assignmentManager.isRegionInTransition(regionInfo);
RegionState regionState = this.assignmentManager.getRegionStates()
.getRegionTransitionState(regionInfo.getEncodedName());
boolean openedNodeDeleted = false;
if (regionState != null
&& regionState.getState().equals(RegionState.State.OPEN)) {

View File

@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@ -289,12 +289,13 @@ public class ServerShutdownHandler extends EventHandler {
if (hris != null) {
List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
RegionState rit = this.services.getAssignmentManager().isRegionInTransition(e.getKey());
RegionState rit = services.getAssignmentManager()
.getRegionStates().getRegionTransitionState(e.getKey());
if (processDeadRegion(e.getKey(), e.getValue(),
this.services.getAssignmentManager(),
this.server.getCatalogTracker())) {
ServerName addressFromAM = this.services.getAssignmentManager()
.getRegionServerOfRegion(e.getKey());
.getRegionStates().getRegionServerOfRegion(e.getKey());
if (rit != null && !rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting()) {
// Skip regions that were in transition unless CLOSING or
// PENDING_CLOSE

View File

@ -105,7 +105,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.regionserver.wal.HLog;

View File

@ -3366,8 +3366,8 @@ public class HRegionServer implements ClientProtocol,
checkOpen();
requestCount.incrementAndGet();
List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
list.add(e.getValue().getRegionInfo());
for (HRegion region: this.onlineRegions.values()) {
list.add(region.getRegionInfo());
}
Collections.sort(list);
return ResponseConverter.buildGetOnlineRegionResponse(list);

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@ -2109,6 +2110,27 @@ public class HBaseTestingUtility {
return hloc.getPort();
}
/**
* Due to async racing issue, a region may not be in
* the online region list of a region server yet, after
* the assignment znode is deleted and the new assignment
* is recorded in master.
*/
public void assertRegionOnServer(
final HRegionInfo hri, final ServerName server,
final long timeout) throws IOException, InterruptedException {
long timeoutTime = System.currentTimeMillis() + timeout;
while (true) {
List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
if (regions.contains(hri)) return;
long now = System.currentTimeMillis();
if (now > timeoutTime) break;
Thread.sleep(10);
}
fail("Could not find region " + hri.getRegionNameAsString()
+ " on server " + server);
}
public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);

View File

@ -29,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.master.ServerManager;
@ -223,8 +222,8 @@ public class TestDrainingServer {
Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer,
drainingServer.getNumberOfOnlineRegions());
Assert.assertFalse("We should not have regions in transition here. List is: "+
master.getAssignmentManager().copyRegionsInTransition(),
master.getAssignmentManager().isRegionsInTransition() );
master.getAssignmentManager().getRegionStates().getRegionsInTransition(),
master.getAssignmentManager().getRegionStates().isRegionsInTransition());
// Kill a few regionservers.
for (int aborted = 0; aborted <= 2; aborted++) {
@ -274,7 +273,7 @@ public class TestDrainingServer {
}
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().isRegionsInTransition()) {
getAssignmentManager().getRegionStates().isRegionsInTransition()) {
}
}

View File

@ -39,9 +39,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

View File

@ -1456,10 +1456,10 @@ public class TestAdmin {
List<HRegionInfo> tableRegions = localAdmin.getTableRegions(tableName);
HRegionInfo hri = tableRegions.get(0);
AssignmentManager am = master.getAssignmentManager();
ServerName server = am.getRegionServerOfRegion(hri);
ServerName server = am.getRegionStates().getRegionServerOfRegion(hri);
localAdmin.move(hri.getEncodedNameAsBytes(), Bytes.toBytes(server.getServerName()));
assertEquals("Current region server and region server before move should be same.", server,
am.getRegionServerOfRegion(hri));
am.getRegionStates().getRegionServerOfRegion(hri));
}

View File

@ -159,7 +159,7 @@ public class TestHCM {
// We can wait for all regions to be onlines, that makes log reading easier when debugging
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().isRegionsInTransition()) {
getAssignmentManager().getRegionStates().isRegionsInTransition()) {
}
// Now moving the region to the second server

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -963,10 +964,10 @@ public class TestMasterObserver {
// wait for assignments to finish, if any
AssignmentManager mgr = master.getAssignmentManager();
Collection<AssignmentManager.RegionState> transRegions =
mgr.copyRegionsInTransition().values();
for (AssignmentManager.RegionState state : transRegions) {
mgr.waitOnRegionToClearRegionsInTransition(state.getRegion());
Collection<RegionState> transRegions =
mgr.getRegionStates().getRegionsInTransition().values();
for (RegionState state : transRegions) {
mgr.getRegionStates().waitOnRegionToClearRegionsInTransition(state.getRegion());
}
// move half the open regions from RS 0 to RS 1
@ -983,9 +984,9 @@ public class TestMasterObserver {
}
// wait for assignments to finish
transRegions = mgr.copyRegionsInTransition().values();
for (AssignmentManager.RegionState state : transRegions) {
mgr.waitOnRegionToClearRegionsInTransition(state.getRegion());
transRegions = mgr.getRegionStates().getRegionsInTransition().values();
for (RegionState state : transRegions) {
mgr.getRegionStates().waitOnRegionToClearRegionsInTransition(state.getRegion());
}
// now trigger a balance

View File

@ -414,7 +414,7 @@ public class TestHFileOutputFormat {
LOG.info("Changing regions in table");
admin.disableTable(table.getTableName());
while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
isRegionsInTransition()) {
getRegionStates().isRegionsInTransition()) {
Threads.sleep(200);
LOG.info("Waiting on table to finish disabling");
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -94,7 +93,8 @@ public class Mocking {
boolean wait = true;
while (wait) {
AssignmentManager.RegionState state = am.getRegionsInTransition().get(encodedName);
RegionState state = am.getRegionStates()
.getRegionsInTransition().get(encodedName);
if (state != null && state.isPendingOpen()){
wait = false;
} else {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
@ -49,8 +50,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
@ -102,6 +102,7 @@ public class TestAssignmentManager {
private ServerManager serverManager;
private ZooKeeperWatcher watcher;
private LoadBalancer balancer;
private HMaster master;
@BeforeClass
public static void beforeClass() throws Exception {
@ -155,7 +156,10 @@ public class TestAssignmentManager {
Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_A, REGIONINFO, -1)).
thenReturn(RegionOpeningState.OPENED);
Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_B, REGIONINFO, -1)).
thenReturn(RegionOpeningState.OPENED);
thenReturn(RegionOpeningState.OPENED);
this.master = Mockito.mock(HMaster.class);
Mockito.when(this.master.getServerManager()).thenReturn(serverManager);
}
@After
@ -299,8 +303,9 @@ public class TestAssignmentManager {
}
}
private void createRegionPlanAndBalance(final AssignmentManager am,
final ServerName from, final ServerName to, final HRegionInfo hri) {
private void createRegionPlanAndBalance(
final AssignmentManager am, final ServerName from,
final ServerName to, final HRegionInfo hri) throws RegionException {
// Call the balance function but fake the region being online first at
// servername from.
am.regionOnline(hri, from);
@ -330,7 +335,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
this.serverManager, ct, balancer, executor, null);
try {
// Make sure our new AM gets callbacks; once registered, can't unregister.
// Thats ok because we make a new zk watcher for each test.
@ -370,7 +375,7 @@ public class TestAssignmentManager {
ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, SERVERNAME_B, versionid);
assertNotSame(-1, versionid);
// Wait on the handler removing the OPENED znode.
while(am.isRegionInTransition(REGIONINFO) != null) Threads.sleep(1);
while(am.getRegionStates().isRegionInTransition(REGIONINFO)) Threads.sleep(1);
} finally {
executor.shutdown();
am.shutdown();
@ -397,7 +402,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
this.serverManager, ct, balancer, executor, null);
try {
processServerShutdownHandler(ct, am, false);
} finally {
@ -448,15 +453,14 @@ public class TestAssignmentManager {
// We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
// Create an AM.
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(this.server,
this.serverManager);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
this.server, this.serverManager);
// adding region to regions and servers maps.
am.regionOnline(REGIONINFO, SERVERNAME_A);
// adding region in pending close.
am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO,
State.SPLITTING, System.currentTimeMillis(), SERVERNAME_A));
am.getRegionStates().updateRegionState(
REGIONINFO, State.SPLITTING, SERVERNAME_A);
am.getZKTable().setEnabledTable(REGIONINFO.getTableNameAsString());
RegionTransition data = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING,
REGIONINFO.getRegionName(), SERVERNAME_A);
@ -470,8 +474,8 @@ public class TestAssignmentManager {
// In both cases the znode should be deleted.
if (regionSplitDone) {
assertTrue("Region state of region in SPLITTING should be removed from rit.",
am.regionsInTransition.isEmpty());
assertFalse("Region state of region in SPLITTING should be removed from rit.",
am.getRegionStates().isRegionsInTransition());
} else {
while (!am.assignInvoked) {
Thread.sleep(1);
@ -497,13 +501,12 @@ public class TestAssignmentManager {
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server, this.serverManager, ct, balancer,
executor, null);
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
// adding region to regions and servers maps.
am.regionOnline(REGIONINFO, SERVERNAME_A);
// adding region in pending close.
am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO,
State.PENDING_CLOSE));
am.getRegionStates().updateRegionState(REGIONINFO, State.PENDING_CLOSE);
if (state == Table.State.DISABLING) {
am.getZKTable().setDisablingTable(REGIONINFO.getTableNameAsString());
} else {
@ -526,8 +529,8 @@ public class TestAssignmentManager {
// assert will be true but the piece of code added for HBASE-5927 will not
// do that.
if (state == Table.State.DISABLED) {
assertTrue("Region state of region in pending close should be removed from rit.",
am.regionsInTransition.isEmpty());
assertFalse("Region state of region in pending close should be removed from rit.",
am.getRegionStates().isRegionsInTransition());
}
} finally {
am.setEnabledTable(REGIONINFO.getTableNameAsString());
@ -618,7 +621,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, null, null);
this.serverManager, ct, balancer, null, null);
try {
// First make sure my mock up basically works. Unassign a region.
unassign(am, SERVERNAME_A, hri);
@ -636,7 +639,7 @@ public class TestAssignmentManager {
// This transition should fail if the znode has been messed with.
ZKAssign.transitionNode(this.watcher, hri, SERVERNAME_A,
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
assertTrue(am.isRegionInTransition(hri) == null);
assertFalse(am.getRegionStates().isRegionInTransition(hri));
} finally {
am.shutdown();
}
@ -654,7 +657,7 @@ public class TestAssignmentManager {
final RecoverableZooKeeper recoverableZk = Mockito
.mock(RecoverableZooKeeper.class);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
this.server, this.serverManager);
this.server, this.serverManager);
Watcher zkw = new ZooKeeperWatcher(HBaseConfiguration.create(), "unittest",
null) {
public RecoverableZooKeeper getRecoverableZooKeeper() {
@ -666,7 +669,7 @@ public class TestAssignmentManager {
.getChildren("/hbase/unassigned", zkw);
am.setWatcher((ZooKeeperWatcher) zkw);
try {
am.processDeadServersAndRegionsInTransition();
am.processDeadServersAndRegionsInTransition(null);
fail("Expected to abort");
} catch (NullPointerException e) {
fail("Should not throw NPE");
@ -678,7 +681,7 @@ public class TestAssignmentManager {
* TestCase verifies that the regionPlan is updated whenever a region fails to open
* and the master tries to process RS_ZK_FAILED_OPEN state.(HBASE-5546).
*/
@Test
@Test(timeout = 5000)
public void testRegionPlanIsUpdatedWhenRegionFailsToOpen() throws IOException, KeeperException,
ServiceException, InterruptedException {
this.server.getConfiguration().setClass(
@ -700,9 +703,8 @@ public class TestAssignmentManager {
EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_FAILED_OPEN, v);
String path = ZKAssign.getNodeName(this.watcher, REGIONINFO
.getEncodedName());
RegionState state = new RegionState(REGIONINFO, State.OPENING, System
.currentTimeMillis(), SERVERNAME_A);
am.regionsInTransition.put(REGIONINFO.getEncodedName(), state);
am.getRegionStates().updateRegionState(
REGIONINFO, State.OPENING, SERVERNAME_A);
// a dummy plan inserted into the regionPlans. This plan is cleared and
// new one is formed
am.regionPlans.put(REGIONINFO.getEncodedName(), new RegionPlan(
@ -777,11 +779,11 @@ public class TestAssignmentManager {
* region which is in Opening state on a dead RS. Master should immediately
* assign the region and not wait for Timeout Monitor.(Hbase-5882).
*/
@Test
@Test(timeout = 5000)
public void testRegionInOpeningStateOnDeadRSWhileMasterFailover() throws IOException,
KeeperException, ServiceException, InterruptedException {
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(this.server,
this.serverManager);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
this.server, this.serverManager);
ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A);
int version = ZKAssign.getVersion(this.watcher, REGIONINFO);
ZKAssign.transitionNode(this.watcher, REGIONINFO, SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
@ -810,7 +812,7 @@ public class TestAssignmentManager {
* @throws IOException
* @throws Exception
*/
@Test
@Test(timeout = 5000)
public void testDisablingTableRegionsAssignmentDuringCleanClusterStartup()
throws KeeperException, IOException, Exception {
this.server.getConfiguration().setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
@ -888,7 +890,7 @@ public class TestAssignmentManager {
}
private void unassign(final AssignmentManager am, final ServerName sn,
final HRegionInfo hri) {
final HRegionInfo hri) throws RegionException {
// Before I can unassign a region, I need to set it online.
am.regionOnline(hri, sn);
// Unassign region.
@ -905,8 +907,7 @@ public class TestAssignmentManager {
* @throws KeeperException
*/
private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(final Server server,
final ServerManager manager)
throws IOException, KeeperException, ServiceException {
final ServerManager manager) throws IOException, KeeperException, ServiceException {
// We need a mocked catalog tracker. Its used by our AM instance.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
// Make an RS Interface implementation. Make it so a scanner can go against
@ -938,7 +939,7 @@ public class TestAssignmentManager {
ExecutorService executor = startupMasterExecutor("mockedAMExecutor");
this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting(
server, manager, ct, this.balancer, executor);
server, manager, ct, this.balancer, executor);
return am;
}
@ -954,10 +955,10 @@ public class TestAssignmentManager {
boolean assignInvoked = false;
AtomicBoolean gate = new AtomicBoolean(true);
public AssignmentManagerWithExtrasForTesting(final Server master,
final ServerManager serverManager, final CatalogTracker catalogTracker,
final LoadBalancer balancer, final ExecutorService service)
throws KeeperException, IOException {
public AssignmentManagerWithExtrasForTesting(
final Server master, final ServerManager serverManager,
final CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service) throws KeeperException, IOException {
super(master, serverManager, catalogTracker, balancer, service, null);
this.es = service;
this.ct = catalogTracker;
@ -987,12 +988,7 @@ public class TestAssignmentManager {
super.assign(region, setOfflineInZK, forceNewPlan, hijack);
this.gate.set(true);
}
@Override
public ServerName getRegionServerOfRegion(HRegionInfo hri) {
return SERVERNAME_A;
}
@Override
public void assign(java.util.List<HRegionInfo> regions, java.util.List<ServerName> servers)
{
@ -1037,7 +1033,7 @@ public class TestAssignmentManager {
// the RIT region to our RIT Map in AM at processRegionsInTransition.
// First clear any inmemory state from AM so it acts like a new master
// coming on line.
am.regionsInTransition.clear();
am.getRegionStates().regionsInTransition.clear();
am.regionPlans.clear();
try {
am.joinCluster();

View File

@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* This tests AssignmentManager with a testing cluster.
*/
@Category(MediumTests.class)
public class TestAssignmentManagerOnCluster {
private final static byte[] FAMILY = Bytes.toBytes("FAMILY");
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static Configuration conf = TEST_UTIL.getConfiguration();
private static HBaseAdmin admin;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
admin = TEST_UTIL.getHBaseAdmin();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* This tests region assignment
*/
@Test
public void testAssignRegion() throws Exception {
String table = "testAssignRegion";
try {
HTableDescriptor desc = new HTableDescriptor(table);
desc.addFamily(new HColumnDescriptor(FAMILY));
admin.createTable(desc);
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
HRegionInfo hri = new HRegionInfo(
desc.getName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
Put put = new Put(hri.getRegionName());
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
Writables.getBytes(hri));
meta.put(put);
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
master.assignRegion(hri);
master.getAssignmentManager().waitForAssignment(hri);
ServerName serverName = master.getAssignmentManager().
getRegionStates().getRegionServerOfRegion(hri);
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
} finally {
TEST_UTIL.deleteTable(Bytes.toBytes(table));
}
}
/**
* This tests offlining a region
*/
@Test
public void testOfflineRegion() throws Exception {
String table = "testOfflineRegion";
try {
HRegionInfo hri = createTableAndGetOneRegion(table);
RegionStates regionStates = TEST_UTIL.getHBaseCluster().
getMaster().getAssignmentManager().getRegionStates();
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
admin.offline(hri.getRegionName());
long timeoutTime = System.currentTimeMillis() + 800;
while (true) {
List<HRegionInfo> regions =
regionStates.getRegionsOfTable(Bytes.toBytes(table));
if (!regions.contains(hri)) break;
long now = System.currentTimeMillis();
if (now > timeoutTime) {
fail("Failed to offline the region in time");
break;
}
Thread.sleep(10);
}
RegionState regionState = regionStates.getRegionState(hri);
assertTrue(regionState.isOffline());
} finally {
TEST_UTIL.deleteTable(Bytes.toBytes(table));
}
}
/**
* This tests moving a region
*/
@Test
public void testMoveRegion() throws Exception {
String table = "testMoveRegion";
try {
HRegionInfo hri = createTableAndGetOneRegion(table);
RegionStates regionStates = TEST_UTIL.getHBaseCluster().
getMaster().getAssignmentManager().getRegionStates();
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
ServerName destServerName = null;
for (int i = 0; i < 3; i++) {
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(i);
if (!destServer.getServerName().equals(serverName)) {
destServerName = destServer.getServerName();
break;
}
}
assertTrue(destServerName != null
&& !destServerName.equals(serverName));
TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(destServerName.getServerName()));
long timeoutTime = System.currentTimeMillis() + 800;
while (true) {
ServerName sn = regionStates.getRegionServerOfRegion(hri);
if (sn != null && sn.equals(destServerName)) {
TEST_UTIL.assertRegionOnServer(hri, sn, 200);
break;
}
long now = System.currentTimeMillis();
if (now > timeoutTime) {
fail("Failed to move the region in time");
}
regionStates.waitForUpdate(50);
}
} finally {
TEST_UTIL.deleteTable(Bytes.toBytes(table));
}
}
HRegionInfo createTableAndGetOneRegion(
final String tableName) throws IOException, InterruptedException {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(FAMILY));
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5);
// wait till the table is assigned
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
long timeoutTime = System.currentTimeMillis() + 100;
while (true) {
List<HRegionInfo> regions = master.getAssignmentManager().
getRegionStates().getRegionsOfTable(Bytes.toBytes(tableName));
if (regions.size() > 3) {
return regions.get(2);
}
long now = System.currentTimeMillis();
if (now > timeoutTime) {
fail("Could not find an online region");
}
Thread.sleep(10);
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -78,7 +78,7 @@ public class TestMXBean {
info.getCoprocessors().length);
Assert.assertEquals(master.getServerManager().getOnlineServersList().size(),
info.getRegionServers().size());
Assert.assertEquals(master.getAssignmentManager().isRegionsInTransition(),
Assert.assertEquals(master.getAssignmentManager().getRegionStates().isRegionsInTransition(),
info.getRegionsInTransition().length > 0);
Assert.assertTrue(info.getRegionServers().size() == 4);

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -770,14 +769,14 @@ public class TestMasterFailover {
// PENDING_OPEN and enabled
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_OPEN, 0, null));
master.getAssignmentManager().getRegionStates().updateRegionState(
region, RegionState.State.PENDING_OPEN, null);
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// PENDING_OPEN and disabled
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_OPEN, 0, null));
master.getAssignmentManager().getRegionStates().updateRegionState(
region, RegionState.State.PENDING_OPEN, null);
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// This test is bad. It puts up a PENDING_CLOSE but doesn't say what
// server we were PENDING_CLOSE against -- i.e. an entry in
@ -808,7 +807,7 @@ public class TestMasterFailover {
final long maxTime = 120000;
boolean done = master.assignmentManager.waitUntilNoRegionsInTransition(maxTime);
if (!done) {
LOG.info("rit=" + master.assignmentManager.copyRegionsInTransition());
LOG.info("rit=" + master.getAssignmentManager().getRegionStates().getRegionsInTransition());
}
long elapsed = System.currentTimeMillis() - now;
assertTrue("Elapsed=" + elapsed + ", maxTime=" + maxTime + ", done=" + done,

View File

@ -33,7 +33,6 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.tmpl.master.AssignmentManagerStatusTmpl;
@ -66,11 +65,11 @@ public class TestMasterStatusServlet {
@Before
public void setupBasicMocks() {
conf = HBaseConfiguration.create();
master = Mockito.mock(HMaster.class);
Mockito.doReturn(FAKE_HOST).when(master).getServerName();
Mockito.doReturn(conf).when(master).getConfiguration();
// Fake serverManager
ServerManager serverManager = Mockito.mock(ServerManager.class);
Mockito.doReturn(1.0).when(serverManager).getAverageLoad();
@ -78,13 +77,15 @@ public class TestMasterStatusServlet {
// Fake AssignmentManager and RIT
AssignmentManager am = Mockito.mock(AssignmentManager.class);
RegionStates rs = Mockito.mock(RegionStates.class);
NavigableMap<String, RegionState> regionsInTransition =
Maps.newTreeMap();
regionsInTransition.put("r1",
new RegionState(FAKE_HRI, RegionState.State.CLOSING, 12345L, FAKE_HOST));
Mockito.doReturn(regionsInTransition).when(am).copyRegionsInTransition();
new RegionState(FAKE_HRI, RegionState.State.CLOSING, 12345L, FAKE_HOST));
Mockito.doReturn(rs).when(am).getRegionStates();
Mockito.doReturn(regionsInTransition).when(rs).getRegionsInTransition();
Mockito.doReturn(am).when(master).getAssignmentManager();
// Fake ZKW
ZooKeeperWatcher zkw = Mockito.mock(ZooKeeperWatcher.class);
Mockito.doReturn("fakequorum").when(zkw).getQuorum();
@ -93,7 +94,6 @@ public class TestMasterStatusServlet {
// Mock admin
admin = Mockito.mock(HBaseAdmin.class);
}
private void setupMockTables() throws IOException {
HTableDescriptor tables[] = new HTableDescriptor[] {
@ -153,7 +153,8 @@ public class TestMasterStatusServlet {
@Test
public void testAssignmentManagerTruncatedList() throws IOException {
AssignmentManager am = Mockito.mock(AssignmentManager.class);
RegionStates rs = Mockito.mock(RegionStates.class);
// Add 100 regions as in-transition
NavigableMap<String, RegionState> regionsInTransition =
Maps.newTreeMap();
@ -161,14 +162,15 @@ public class TestMasterStatusServlet {
HRegionInfo hri = new HRegionInfo(FAKE_TABLE.getName(),
new byte[]{i}, new byte[]{(byte) (i+1)});
regionsInTransition.put(hri.getEncodedName(),
new RegionState(hri, RegionState.State.CLOSING, 12345L, FAKE_HOST));
new RegionState(hri, RegionState.State.CLOSING, 12345L, FAKE_HOST));
}
// Add META in transition as well
regionsInTransition.put(
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(),
new RegionState(HRegionInfo.FIRST_META_REGIONINFO,
RegionState.State.CLOSING, 12345L, FAKE_HOST));
Mockito.doReturn(regionsInTransition).when(am).copyRegionsInTransition();
Mockito.doReturn(rs).when(am).getRegionStates();
Mockito.doReturn(regionsInTransition).when(rs).getRegionsInTransition();
// Render to a string
StringWriter sw = new StringWriter();

View File

@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -124,9 +123,12 @@ public class TestOpenedRegionHandler {
region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd);
assertNotNull(region);
AssignmentManager am = Mockito.mock(AssignmentManager.class);
when(am.isRegionInTransition(hri)).thenReturn(
new RegionState(region.getRegionInfo(), RegionState.State.OPEN,
System.currentTimeMillis(), server.getServerName()));
RegionStates rsm = Mockito.mock(RegionStates.class);
Mockito.doReturn(rsm).when(am).getRegionStates();
when(rsm.isRegionInTransition(hri)).thenReturn(false);
when(rsm.getRegionState(hri)).thenReturn(
new RegionState(region.getRegionInfo(), RegionState.State.OPEN,
System.currentTimeMillis(), server.getServerName()));
// create a node with OPENED state
zkw = HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL,
region, server.getServerName());

View File

@ -436,7 +436,7 @@ public class TestSplitTransactionOnCluster {
hri.setOffline(true);
hri.setSplit(true);
ServerName regionServerOfRegion = master.getAssignmentManager()
.getRegionServerOfRegion(hri);
.getRegionStates().getRegionServerOfRegion(hri);
assertTrue(regionServerOfRegion != null);
} finally {
@ -515,7 +515,7 @@ public class TestSplitTransactionOnCluster {
hri.setOffline(true);
hri.setSplit(true);
ServerName regionServerOfRegion = master.getAssignmentManager()
.getRegionServerOfRegion(hri);
.getRegionStates().getRegionServerOfRegion(hri);
assertTrue(regionServerOfRegion == null);
} finally {
// Set this flag back.

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -212,8 +210,12 @@ public class TestWALReplay {
Bytes.toBytes(destServer.getServerName().getServerName()));
while (true) {
ServerName serverName = master.getAssignmentManager()
.getRegionServerOfRegion(destRegion.getRegionInfo());
if (serverName != null && serverName.equals(destServer.getServerName())) break;
.getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
if (serverName != null && serverName.equals(destServer.getServerName())) {
TEST_UTIL.assertRegionOnServer(
destRegion.getRegionInfo(), serverName, 200);
break;
}
Thread.sleep(10);
}
}
@ -709,12 +711,10 @@ public class TestWALReplay {
// Flusher used in this test. Keep count of how often we are called and
// actually run the flush inside here.
class TestFlusher implements FlushRequester {
private int count = 0;
private HRegion r;
@Override
public void requestFlush(HRegion region) {
count++;
try {
r.flushcache();
} catch (IOException e) {

View File

@ -63,12 +63,14 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.HBaseFsck.HbckInfo;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.HBaseFsck.HbckInfo;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -88,7 +90,8 @@ public class TestHBaseFsck {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static Configuration conf = TEST_UTIL.getConfiguration();
private final static byte[] FAM = Bytes.toBytes("fam");
private final static int REGION_ONLINE_TIMEOUT = 300;
private final static int REGION_ONLINE_TIMEOUT = 800;
private static RegionStates regionStates;
// for the instance, reset every test run
private HTable tbl;
@ -103,6 +106,10 @@ public class TestHBaseFsck {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, false);
TEST_UTIL.startMiniCluster(3);
AssignmentManager assignmentManager =
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
regionStates = assignmentManager.getRegionStates();
}
@AfterClass
@ -406,6 +413,8 @@ public class TestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().assignRegion(hriDupe);
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.waitForAssignment(hriDupe);
ServerName server = regionStates.getRegionServerOfRegion(hriDupe);
TEST_UTIL.assertRegionOnServer(hriDupe, server, REGION_ONLINE_TIMEOUT);
HBaseFsck hbck = doFsck(conf, false);
assertErrors(hbck, new ERROR_CODE[] { ERROR_CODE.DUPE_STARTKEYS,
@ -482,6 +491,8 @@ public class TestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().assignRegion(hriDupe);
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.waitForAssignment(hriDupe);
ServerName server = regionStates.getRegionServerOfRegion(hriDupe);
TEST_UTIL.assertRegionOnServer(hriDupe, server, REGION_ONLINE_TIMEOUT);
// Yikes! The assignment manager can't tell between diff between two
// different regions with the same start/endkeys since it doesn't
@ -531,6 +542,8 @@ public class TestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().assignRegion(hriDupe);
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.waitForAssignment(hriDupe);
ServerName server = regionStates.getRegionServerOfRegion(hriDupe);
TEST_UTIL.assertRegionOnServer(hriDupe, server, REGION_ONLINE_TIMEOUT);
HBaseFsck hbck = doFsck(conf,false);
assertErrors(hbck, new ERROR_CODE[] { ERROR_CODE.DEGENERATE_REGION,
@ -568,6 +581,8 @@ public class TestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().assignRegion(hriOverlap);
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.waitForAssignment(hriOverlap);
ServerName server = regionStates.getRegionServerOfRegion(hriOverlap);
TEST_UTIL.assertRegionOnServer(hriOverlap, server, REGION_ONLINE_TIMEOUT);
HBaseFsck hbck = doFsck(conf, false);
assertErrors(hbck, new ERROR_CODE[] {
@ -702,6 +717,8 @@ public class TestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().assignRegion(hriOverlap);
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.waitForAssignment(hriOverlap);
ServerName server = regionStates.getRegionServerOfRegion(hriOverlap);
TEST_UTIL.assertRegionOnServer(hriOverlap, server, REGION_ONLINE_TIMEOUT);
HBaseFsck hbck = doFsck(conf, false);
assertErrors(hbck, new ERROR_CODE[] {
@ -739,6 +756,8 @@ public class TestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().assignRegion(hriOverlap);
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.waitForAssignment(hriOverlap);
ServerName server = regionStates.getRegionServerOfRegion(hriOverlap);
TEST_UTIL.assertRegionOnServer(hriOverlap, server, REGION_ONLINE_TIMEOUT);
HBaseFsck hbck = doFsck(conf, false);
assertErrors(hbck, new ERROR_CODE[] {
@ -1038,9 +1057,9 @@ public class TestHBaseFsck {
int iTimes = 0;
while (true) {
RegionTransition rt = RegionTransition.parseFrom(ZKAssign.getData(zkw,
region.getEncodedName()));
if (rt != null && rt.getEventType() == EventType.RS_ZK_REGION_OPENED) {
byte[] data = ZKAssign.getData(zkw, region.getEncodedName());
RegionTransition rt = data == null ? null : RegionTransition.parseFrom(data);
if (rt == null || rt.getEventType() == EventType.RS_ZK_REGION_OPENED) {
break;
}
Thread.sleep(100);