HBASE-6381 AssignmentManager should use the same logic for clean startup and failover

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1389561 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-09-24 20:33:19 +00:00
parent e343aae374
commit d930324448
17 changed files with 872 additions and 951 deletions

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
/**
* Run bulk assign. Does one RCP per regionserver passing a
* batch of regions using {@link SingleServerBulkAssigner}.
*/
@InterfaceAudience.Private
public class GeneralBulkAssigner extends BulkAssigner {
private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
private Map<ServerName, List<HRegionInfo>> failedPlans
= new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
private ExecutorService pool;
final Map<ServerName, List<HRegionInfo>> bulkPlan;
final AssignmentManager assignmentManager;
GeneralBulkAssigner(final Server server,
final Map<ServerName, List<HRegionInfo>> bulkPlan,
final AssignmentManager am) {
super(server);
this.bulkPlan = bulkPlan;
this.assignmentManager = am;
}
@Override
public boolean bulkAssign(boolean sync) throws InterruptedException,
IOException {
// Disable timing out regions in transition up in zk while bulk assigning.
this.assignmentManager.timeoutMonitor.bulkAssign(true);
try {
return super.bulkAssign(sync);
} finally {
// Re-enable timing out regions in transition up in zk.
this.assignmentManager.timeoutMonitor.bulkAssign(false);
}
}
@Override
protected String getThreadNamePrefix() {
return this.server.getServerName() + "-GeneralBulkAssigner";
}
@Override
protected void populatePool(ExecutorService pool) {
this.pool = pool; // shut it down later in case some assigner hangs
for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
this.assignmentManager, this.failedPlans));
}
}
/**
*
* @param timeout How long to wait.
* @return true if done.
*/
@Override
protected boolean waitUntilDone(final long timeout)
throws InterruptedException {
Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
for (List<HRegionInfo> regionList : bulkPlan.values()) {
regionSet.addAll(regionList);
}
pool.shutdown(); // no more task allowed
int serverCount = bulkPlan.size();
int regionCount = regionSet.size();
long startTime = System.currentTimeMillis();
long rpcWaitTime = startTime + timeout;
while (!server.isStopped() && !pool.isTerminated()
&& rpcWaitTime > System.currentTimeMillis()) {
if (failedPlans.isEmpty()) {
pool.awaitTermination(100, TimeUnit.MILLISECONDS);
} else {
reassignFailedPlans();
}
}
if (!pool.isTerminated()) {
LOG.warn("bulk assigner is still running after "
+ (System.currentTimeMillis() - startTime) + "ms, shut it down now");
// some assigner hangs, can't wait any more, shutdown the pool now
List<Runnable> notStarted = pool.shutdownNow();
if (notStarted != null && !notStarted.isEmpty()) {
server.abort("some single server assigner hasn't started yet"
+ " when the bulk assigner timed out", null);
return false;
}
}
int reassigningRegions = 0;
if (!failedPlans.isEmpty() && !server.isStopped()) {
reassigningRegions = reassignFailedPlans();
}
Configuration conf = server.getConfiguration();
long perRegionOpenTimeGuesstimate =
conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
+ perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
RegionStates regionStates = assignmentManager.getRegionStates();
// We're not synchronizing on regionsInTransition now because we don't use any iterator.
while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
while (regionInfoIterator.hasNext()) {
HRegionInfo hri = regionInfoIterator.next();
RegionState state = regionStates.getRegionState(hri);
if ((!regionStates.isRegionInTransition(hri) && regionStates.isRegionAssigned(hri))
|| state.isSplit() || state.isSplitting()) {
regionInfoIterator.remove();
}
}
if (!regionSet.isEmpty()) {
regionStates.waitForUpdate(100);
}
}
if (LOG.isDebugEnabled()) {
long elapsedTime = System.currentTimeMillis() - startTime;
String status = "successfully";
if (!regionSet.isEmpty()) {
status = "with " + regionSet.size() + " regions still not assigned yet";
}
LOG.debug("bulk assigning total " + regionCount + " regions to "
+ serverCount + " servers, took " + elapsedTime + "ms, " + status);
}
return regionSet.isEmpty();
}
@Override
protected long getTimeoutOnRIT() {
// Guess timeout. Multiply the max number of regions on a server
// by how long we think one region takes opening.
Configuration conf = server.getConfiguration();
long perRegionOpenTimeGuesstimate =
conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
int maxRegionsPerServer = 1;
for (List<HRegionInfo> regionList : bulkPlan.values()) {
int size = regionList.size();
if (size > maxRegionsPerServer) {
maxRegionsPerServer = size;
}
}
long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
+ conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
+ conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
30000) * bulkPlan.size();
LOG.debug("Timeout-on-RIT=" + timeout);
return timeout;
}
@Override
protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
return new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.warn("Assigning regions in " + t.getName(), e);
}
};
}
private int reassignFailedPlans() {
List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
LOG.info("Failed assigning " + e.getValue().size()
+ " regions to server " + e.getKey() + ", reassigning them");
reassigningRegions.addAll(failedPlans.remove(e.getKey()));
}
for (HRegionInfo region : reassigningRegions) {
assignmentManager.assign(region, true, true);
}
return reassigningRegions.size();
}
/**
* Manage bulk assigning to a server.
*/
static class SingleServerBulkAssigner implements Runnable {
private final ServerName regionserver;
private final List<HRegionInfo> regions;
private final AssignmentManager assignmentManager;
private final Map<ServerName, List<HRegionInfo>> failedPlans;
SingleServerBulkAssigner(final ServerName regionserver,
final List<HRegionInfo> regions, final AssignmentManager am,
final Map<ServerName, List<HRegionInfo>> failedPlans) {
this.regionserver = regionserver;
this.regions = regions;
this.assignmentManager = am;
this.failedPlans = failedPlans;
}
@Override
public void run() {
try {
if (!assignmentManager.assign(regionserver, regions)) {
failedPlans.put(regionserver, regions);
}
} catch (Throwable t) {
LOG.warn("Failed bulking assigning " + regions.size()
+ " region(s) to " + regionserver.getServerName()
+ ", and continue to bulk assign others", t);
failedPlans.put(regionserver, regions);
}
}
}
}

View File

@ -30,11 +30,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -61,7 +58,6 @@ import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
@ -765,12 +761,11 @@ Server {
/**
* If ServerShutdownHandler is disabled, we enable it and expire those dead
* but not expired servers.
* @throws IOException
*/
private void enableServerShutdownHandler() throws IOException {
private void enableServerShutdownHandler() {
if (!serverShutdownHandlerEnabled) {
serverShutdownHandlerEnabled = true;
this.serverManager.expireDeadNotExpiredServers();
this.serverManager.processQueuedDeadServers();
}
}
@ -845,7 +840,7 @@ Server {
enableSSHandWaitForMeta();
assigned++;
} else {
// Region already assigned. We didnt' assign it. Add to in-memory state.
// Region already assigned. We didn't assign it. Add to in-memory state.
this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
this.catalogTracker.getMetaLocation());
}
@ -911,9 +906,12 @@ Server {
// Now work on our list of found parents. See if any we can clean up.
int fixups = 0;
for (Map.Entry<HRegionInfo, Result> e : offlineSplitParents.entrySet()) {
ServerName sn = HRegionInfo.getServerName(e.getValue());
if (!serverManager.isServerDead(sn)) { // Otherwise, let SSH take care of it
fixups += ServerShutdownHandler.fixupDaughters(
e.getValue(), assignmentManager, catalogTracker);
}
}
if (fixups != 0) {
LOG.info("Scanned the catalog and fixed up " + fixups +
" missing daughter region(s)");
@ -1484,7 +1482,7 @@ Server {
}
this.executorService.submit(new CreateTableHandler(this,
this.fileSystemManager, this.serverManager, hTableDescriptor, conf,
this.fileSystemManager, hTableDescriptor, conf,
newRegions, catalogTracker, assignmentManager));
if (cpHost != null) {
cpHost.postCreateTable(hTableDescriptor, newRegions);

View File

@ -132,6 +132,13 @@ public class RegionStates {
return regionAssignments.containsKey(hri);
}
/**
* @return the server the specified region assigned to; null if not assigned.
*/
public synchronized ServerName getAssignedServer(final HRegionInfo hri) {
return regionAssignments.get(hri);
}
/**
* Wait for the state map to be updated by assignment manager.
*/
@ -519,7 +526,11 @@ public class RegionStates {
try {
Pair<HRegionInfo, ServerName> p =
MetaReader.getRegion(server.getCatalogTracker(), regionName);
return p == null ? null : p.getFirst();
HRegionInfo hri = p == null ? null : p.getFirst();
if (hri != null) {
createRegionState(hri);
}
return hri;
} catch (IOException e) {
server.abort("Aborting because error occoured while reading " +
Bytes.toStringBinary(regionName) + " from .META.", e);

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.Server;
@ -59,11 +58,9 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
@ -78,6 +75,15 @@ import com.google.protobuf.ServiceException;
* (hostname and port) as well as the startcode (timestamp from when the server
* was started). This is used to differentiate a restarted instance of a given
* server from the original instance.
* <p>
* If a sever is known not to be running any more, it is called dead. The dead
* server needs to be handled by a ServerShutdownHandler. If the handler is not
* enabled yet, the server can't be handled right away so it is queued up.
* After the handler is enabled, the server will be submitted to a handler to handle.
* However, the handler may be just partially enabled. If so,
* the server cannot be fully processed, and be queued up for further processing.
* A server is fully processed only after the handler is fully enabled
* and has completed the handling.
*/
@InterfaceAudience.Private
public class ServerManager {
@ -117,12 +123,39 @@ public class ServerManager {
private final long warningSkew;
/**
* Set of region servers which are dead but not expired immediately. If one
* Set of region servers which are dead but not processed immediately. If one
* server died before master enables ServerShutdownHandler, the server will be
* added to set and will be expired through calling
* {@link ServerManager#expireDeadNotExpiredServers()} by master.
* added to this set and will be processed through calling
* {@link ServerManager#processQueuedDeadServers()} by master.
* <p>
* A dead server is a server instance known to be dead, not listed in the /hbase/rs
* znode any more. It may have not been submitted to ServerShutdownHandler yet
* because the handler is not enabled.
* <p>
* A dead server, which has been submitted to ServerShutdownHandler while the
* handler is not enabled, is queued up.
* <p>
* So this is a set of region servers known to be dead but not submitted to
* ServerShutdownHander for processing yet.
*/
private Set<ServerName> deadNotExpiredServers = new HashSet<ServerName>();
private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
/**
* Set of region servers which are dead and submitted to ServerShutdownHandler to
* process but not fully processed immediately.
* <p>
* If one server died before assignment manager finished the failover cleanup, the server
* will be added to this set and will be processed through calling
* {@link ServerManager#processQueuedDeadServers()} by assignment manager.
* <p>
* For all the region servers in this set, HLog split is already completed.
* <p>
* ServerShutdownHandler processes a dead server submitted to the handler after
* the handler is enabled. It may not be able to complete the processing because root/meta
* is not yet online or master is currently in startup mode. In this case, the dead
* server will be parked in this set temporarily.
*/
private Set<ServerName> requeuedDeadServers = new HashSet<ServerName>();
/**
* Constructor.
@ -325,18 +358,6 @@ public class ServerManager {
return this.onlineServers.get(serverName);
}
/**
* @param address
* @return ServerLoad if serverName is known else null
* @deprecated Use {@link #getLoad(HServerAddress)}
*/
public ServerLoad getLoad(final HServerAddress address) {
ServerName sn = new ServerName(address.toString(), ServerName.NON_STARTCODE);
ServerName actual =
ServerName.findServerWithSameHostnamePort(this.getOnlineServersList(), sn);
return actual == null? null: getLoad(actual);
}
/**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
@ -417,13 +438,12 @@ public class ServerManager {
if (!services.isServerShutdownHandlerEnabled()) {
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
+ "delay expiring server " + serverName);
this.deadNotExpiredServers.add(serverName);
this.queuedDeadServers.add(serverName);
return;
}
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Received expiration of " + serverName +
" but server is not currently online");
return;
}
if (this.deadservers.contains(serverName)) {
// TODO: Can this happen? It shouldn't be online in this case?
@ -465,20 +485,47 @@ public class ServerManager {
carryingRoot + ", meta=" + carryingMeta);
}
/**
* Expire the servers which died during master's initialization. It will be
* called after HMaster#assignRootAndMeta.
* @throws IOException
* */
synchronized void expireDeadNotExpiredServers() throws IOException {
if (!services.isServerShutdownHandlerEnabled()) {
throw new IOException("Master hasn't enabled ServerShutdownHandler ");
public synchronized void processDeadServer(final ServerName serverName) {
// When assignment manager is cleaning up the zookeeper nodes and rebuilding the
// in-memory region states, region servers could be down. Root/meta table can and
// should be re-assigned, log splitting can be done too. However, it is better to
// wait till the cleanup is done before re-assigning user regions.
//
// We should not wait in the server shutdown handler thread since it can clog
// the handler threads and root/meta table could not be re-assigned in case
// the corresponding server is down. So we queue them up here instead.
if (!services.getAssignmentManager().isFailoverCleanupDone()) {
requeuedDeadServers.add(serverName);
return;
}
Iterator<ServerName> serverIterator = deadNotExpiredServers.iterator();
this.deadservers.add(serverName);
this.services.getExecutorService().submit(new ServerShutdownHandler(
this.master, this.services, this.deadservers, serverName, false));
}
/**
* Process the servers which died during master's initialization. It will be
* called after HMaster#assignRootAndMeta and AssignmentManager#joinCluster.
* */
synchronized void processQueuedDeadServers() {
if (!services.isServerShutdownHandlerEnabled()) {
LOG.info("Master hasn't enabled ServerShutdownHandler");
}
Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
while (serverIterator.hasNext()) {
expireServer(serverIterator.next());
serverIterator.remove();
}
if (!services.getAssignmentManager().isFailoverCleanupDone()) {
LOG.info("AssignmentManager hasn't finished failover cleanup");
}
serverIterator = requeuedDeadServers.iterator();
while (serverIterator.hasNext()) {
processDeadServer(serverIterator.next());
serverIterator.remove();
}
}
/*
@ -713,11 +760,23 @@ public class ServerManager {
* @return A copy of the internal set of deadNotExpired servers.
*/
Set<ServerName> getDeadNotExpiredServers() {
return new HashSet<ServerName>(this.deadNotExpiredServers);
return new HashSet<ServerName>(this.queuedDeadServers);
}
public boolean isServerOnline(ServerName serverName) {
return onlineServers.containsKey(serverName);
return serverName != null && onlineServers.containsKey(serverName);
}
/**
* Check if a server is known to be dead. A server can be online,
* or known to be dead, or unknown to this manager (i.e, not online,
* not known to be dead either. it is simply not tracked by the
* master any more, for example, a very old previous instance).
*/
public synchronized boolean isServerDead(ServerName serverName) {
return serverName == null || deadservers.isDeadServer(serverName)
|| queuedDeadServers.contains(serverName)
|| requeuedDeadServers.contains(serverName);
}
public void shutdownCluster() {

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
@ -41,9 +40,7 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.zookeeper.KeeperException;
@ -58,19 +55,15 @@ public class CreateTableHandler extends EventHandler {
private Configuration conf;
private final AssignmentManager assignmentManager;
private final CatalogTracker catalogTracker;
private final ServerManager serverManager;
private final HRegionInfo [] newRegions;
public CreateTableHandler(Server server, MasterFileSystem fileSystemManager,
ServerManager serverManager, HTableDescriptor hTableDescriptor,
Configuration conf, HRegionInfo [] newRegions,
HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions,
CatalogTracker catalogTracker, AssignmentManager assignmentManager)
throws NotAllMetaRegionsOnlineException, TableExistsException,
IOException {
throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
super(server, EventType.C_M_CREATE_TABLE);
this.fileSystemManager = fileSystemManager;
this.serverManager = serverManager;
this.hTableDescriptor = hTableDescriptor;
this.conf = conf;
this.newRegions = newRegions;
@ -173,11 +166,10 @@ public class CreateTableHandler extends EventHandler {
}
// 4. Trigger immediate assignment of the regions in round-robin fashion
List<ServerName> servers = serverManager.createDestinationServersList();
try {
List<HRegionInfo> regions = Arrays.asList(newRegions);
assignmentManager.getRegionStates().createRegionStates(regions);
assignmentManager.assignUserRegions(regions, servers);
assignmentManager.assign(regions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw new IOException(ie);

View File

@ -208,7 +208,7 @@ public class EnableTableHandler extends EventHandler {
}
} else {
try {
assignmentManager.assignUserRegionsToOnlineServers(regions);
assignmentManager.assign(regions);
} catch (InterruptedException e) {
LOG.warn("Assignment was interrupted");
Thread.currentThread().interrupt();

View File

@ -222,10 +222,13 @@ public class ServerShutdownHandler extends EventHandler {
// The solution here is to resubmit a ServerShutdownHandler request to process
// user regions on that server so that MetaServerShutdownHandler
// executor pool is always available.
if (isCarryingRoot() || isCarryingMeta()) { // -ROOT- or .META.
this.services.getExecutorService().submit(new ServerShutdownHandler(
this.server, this.services, this.deadServers, serverName, false));
this.deadServers.add(serverName);
//
// If AssignmentManager hasn't finished rebuilding user regions,
// we are not ready to assign dead regions either. So we re-queue up
// the dead server for further processing too.
if (isCarryingRoot() || isCarryingMeta() // -ROOT- or .META.
|| !services.getAssignmentManager().isFailoverCleanupDone()) {
this.services.getServerManager().processDeadServer(serverName);
return;
}
@ -267,6 +270,9 @@ public class ServerShutdownHandler extends EventHandler {
serverName + ", retrying META read", ioe);
}
}
if (this.server.isStopped()) {
throw new IOException("Server is stopped");
}
// Skip regions that were in transition unless CLOSING or PENDING_CLOSE
for (RegionState rit : regionsInTransition) {
@ -347,11 +353,12 @@ public class ServerShutdownHandler extends EventHandler {
toAssignRegions.remove(hri);
}
}
// Get all available servers
List<ServerName> availableServers = services.getServerManager()
.createDestinationServersList();
this.services.getAssignmentManager().assign(toAssignRegions,
availableServers);
try {
this.services.getAssignmentManager().assign(toAssignRegions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw new IOException(ie);
}
}
} finally {
this.deadServers.finish(serverName);

View File

@ -49,7 +49,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
@ -68,14 +67,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionMovedException;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
@ -167,11 +167,20 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -214,23 +223,12 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;

View File

@ -27,14 +27,11 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Helper class for table state tracking for use by {@link AssignmentManager}.
* Reads, caches and sets state up in zookeeper. If multiple read/write
@ -305,4 +302,67 @@ public class ZKTable {
}
return disabledTables;
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
* @throws KeeperException
*/
public static Set<String> getDisabledTables(ZooKeeperWatcher zkw)
throws KeeperException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED);
}
/**
* Gets a list of all the tables set as disabling in zookeeper.
* @return Set of disabling tables, empty Set if none
* @throws KeeperException
*/
public static Set<String> getDisablingTables(ZooKeeperWatcher zkw)
throws KeeperException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLING);
}
/**
* Gets a list of all the tables set as enabling in zookeeper.
* @return Set of enabling tables, empty Set if none
* @throws KeeperException
*/
public static Set<String> getEnablingTables(ZooKeeperWatcher zkw)
throws KeeperException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.ENABLING);
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
* @throws KeeperException
*/
public static Set<String> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
throws KeeperException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED,
ZooKeeperProtos.Table.State.DISABLING);
}
/**
* Gets a list of all the tables of specified states in zookeeper.
* @return Set of tables of specified states, empty Set if none
* @throws KeeperException
*/
static Set<String> getAllTables(final ZooKeeperWatcher zkw,
final ZooKeeperProtos.Table.State... states) throws KeeperException {
Set<String> allTables = new HashSet<String>();
List<String> children =
ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
for (String child: children) {
ZooKeeperProtos.Table.State state = ZKTableReadOnly.getTableState(zkw, child);
for (ZooKeeperProtos.Table.State expectedState: states) {
if (state == expectedState) {
allTables.add(child);
break;
}
}
}
return allTables;
}
}

View File

@ -398,11 +398,12 @@ public class TestZooKeeper {
ZKUtil.createAndFailSilent(zk2, aclZnode);
}
@Test
/**
* Test should not fail with NPE when getChildDataAndWatchForNewChildren
* invoked with wrongNode
*/
@Test
@SuppressWarnings("deprecation")
public void testGetChildDataAndWatchForNewChildrenShouldNotThrowNPE()
throws Exception {
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
@ -443,7 +444,7 @@ public class TestZooKeeper {
* Tests whether the logs are split when master recovers from a expired zookeeper session and an
* RS goes down.
*/
@Test(timeout = 60000)
@Test(timeout = 180000)
public void testLogSplittingAfterMasterRecoveryDueToZKExpiry() throws IOException,
KeeperException, InterruptedException {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();

View File

@ -155,7 +155,7 @@ public class TestHCM {
table.put(put2);
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
// We can wait for all regions to be onlines, that makes log reading easier when debugging
// We can wait for all regions to be online, that makes log reading easier when debugging
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition()) {
}

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.io.InterruptedIOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@ -32,9 +31,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@ -337,6 +336,7 @@ public class TestAssignmentManager {
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
am.failoverCleanupDone.set(true);
try {
// Make sure our new AM gets callbacks; once registered, can't unregister.
// Thats ok because we make a new zk watcher for each test.
@ -451,9 +451,10 @@ public class TestAssignmentManager {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testSSHWhenSplitRegionInProgress");
// We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
ZKAssign.deleteAllNodes(this.watcher);
// Create an AM.
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
this.server, this.serverManager);
@ -501,6 +502,8 @@ public class TestAssignmentManager {
// We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
ZKAssign.deleteAllNodes(this.watcher);
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
@ -521,6 +524,7 @@ public class TestAssignmentManager {
String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
// create znode in M_ZK_REGION_CLOSING state.
ZKUtil.createAndWatch(this.watcher, node, data.toByteArray());
try {
processServerShutdownHandler(ct, am, false);
// check znode deleted or not.
@ -588,6 +592,7 @@ public class TestAssignmentManager {
Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
services, deadServers, SERVERNAME_A, false);
am.failoverCleanupDone.set(true);
handler.process();
// The region in r will have been assigned. It'll be up in zk as unassigned.
}
@ -667,7 +672,7 @@ public class TestAssignmentManager {
};
((ZooKeeperWatcher) zkw).registerListener(am);
Mockito.doThrow(new InterruptedException()).when(recoverableZk)
.getChildren("/hbase/unassigned", zkw);
.getChildren("/hbase/unassigned", null);
am.setWatcher((ZooKeeperWatcher) zkw);
try {
am.processDeadServersAndRegionsInTransition(null);
@ -791,8 +796,8 @@ public class TestAssignmentManager {
EventType.RS_ZK_REGION_OPENING, version);
RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_OPENING,
REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY);
Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers =
new HashMap<ServerName, List<Pair<HRegionInfo, Result>>>();
Map<ServerName, List<HRegionInfo>> deadServers =
new HashMap<ServerName, List<HRegionInfo>>();
deadServers.put(SERVERNAME_A, null);
version = ZKAssign.getVersion(this.watcher, REGIONINFO);
am.gate.set(false);
@ -969,20 +974,12 @@ public class TestAssignmentManager {
@Override
boolean processRegionInTransition(String encodedRegionName,
HRegionInfo regionInfo,
Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
Map<ServerName, List<HRegionInfo>> deadServers)
throws KeeperException, IOException {
this.processRITInvoked = true;
return super.processRegionInTransition(encodedRegionName, regionInfo,
deadServers);
}
@Override
void processRegionsInTransition(final RegionTransition rt,
final HRegionInfo regionInfo,
final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
final int expectedVersion) throws KeeperException {
while (this.gate.get()) Threads.sleep(1);
super.processRegionsInTransition(rt, regionInfo, deadServers, expectedVersion);
}
@Override
public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan,
@ -992,10 +989,10 @@ public class TestAssignmentManager {
}
@Override
public void assign(java.util.List<HRegionInfo> regions, java.util.List<ServerName> servers)
{
public void assign(List<HRegionInfo> regions)
throws IOException, InterruptedException {
assignInvoked = true;
};
}
/** reset the watcher */
void setWatcher(ZooKeeperWatcher watcher) {

View File

@ -781,34 +781,14 @@ public class TestMasterFailover {
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
master.getAssignmentManager().getRegionStates().updateRegionState(
region, RegionState.State.PENDING_OPEN, null);
region, RegionState.State.PENDING_OPEN);
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// PENDING_OPEN and disabled
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
master.getAssignmentManager().getRegionStates().updateRegionState(
region, RegionState.State.PENDING_OPEN, null);
region, RegionState.State.PENDING_OPEN);
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// This test is bad. It puts up a PENDING_CLOSE but doesn't say what
// server we were PENDING_CLOSE against -- i.e. an entry in
// AssignmentManager#regions. W/o a server, we NPE trying to resend close.
// In past, there was wonky logic that had us reassign region if no server
// at tail of the unassign. This was removed. Commenting out for now.
// TODO: Remove completely.
/*
// PENDING_CLOSE and enabled
region = enabledRegions.remove(0);
LOG.info("Setting PENDING_CLOSE enabled " + region.getEncodedName());
regionsThatShouldBeOnline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
// PENDING_CLOSE and disabled
region = disabledRegions.remove(0);
LOG.info("Setting PENDING_CLOSE disabled " + region.getEncodedName());
regionsThatShouldBeOffline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
*/
// Failover should be completed, now wait for no RIT
log("Waiting for no more RIT");

View File

@ -131,8 +131,8 @@ public class TestReplication {
LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
utility1.startMiniCluster(2);
utility2.startMiniCluster(2);
utility1.startMiniCluster(3);
utility2.startMiniCluster(3);
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.AdminProtocol;
@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.io.hfile.TestHFile;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
@ -78,6 +76,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck.HbckInfo;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
@ -138,6 +137,7 @@ public class TestHBaseFsck {
// point to a different region server
HTable meta = new HTable(conf, HTableDescriptor.META_TABLEDESC.getName());
ResultScanner scanner = meta.getScanner(new Scan());
HRegionInfo hri = null;
resforloop:
for (Result res : scanner) {
@ -158,6 +158,7 @@ public class TestHBaseFsck {
put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(sn.getStartcode()));
meta.put(put);
hri = HRegionInfo.getHRegionInfo(res);
break resforloop;
}
}
@ -167,10 +168,8 @@ public class TestHBaseFsck {
assertErrors(doFsck(conf, true), new ERROR_CODE[]{
ERROR_CODE.SERVER_DOES_NOT_MATCH_META});
// fixing assignments require opening regions is not synchronous. To make
// the test pass consistently so for now we bake in some sleep to let it
// finish. 1s seems sufficient.
Thread.sleep(1000);
TEST_UTIL.getHBaseCluster().getMaster()
.getAssignmentManager().waitForAssignment(hri);
// Should be fixed now
assertNoErrors(doFsck(conf, false));
@ -318,18 +317,6 @@ public class TestHBaseFsck {
}
tbl.put(puts);
tbl.flushCommits();
long endTime = System.currentTimeMillis() + 60000;
while (!TEST_UTIL.getHBaseAdmin().isTableEnabled(tablename)) {
try {
if (System.currentTimeMillis() > endTime) {
fail("Failed to enable table " + tablename + " after waiting for 60 sec");
}
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
fail("Interrupted when waiting table " + tablename + " to be enabled");
}
}
return tbl;
}
@ -1117,15 +1104,19 @@ public class TestHBaseFsck {
// Region of disable table was opened on RS
TEST_UTIL.getHBaseAdmin().disableTable(table);
// Mess up ZKTable state, otherwise, can't open the region
ZKTable zkTable = cluster.getMaster().getAssignmentManager().getZKTable();
zkTable.setEnabledTable(table);
HRegionInfo region = disabledRegions.remove(0);
ZKAssign.createNodeOffline(zkw, region, serverName);
ProtobufUtil.openRegion(hrs, region);
int iTimes = 0;
byte[] regionName = region.getRegionName();
while (true) {
byte[] data = ZKAssign.getData(zkw, region.getEncodedName());
RegionTransition rt = data == null ? null : RegionTransition.parseFrom(data);
if (rt == null || rt.getEventType() == EventType.RS_ZK_REGION_OPENED) {
if (cluster.getServerWith(regionName) != -1) {
// Now, region is deployed, reset the table state back
zkTable.setDisabledTable(table);
break;
}
Thread.sleep(100);