HBASE-11047 Remove TimeoutMontior
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1589439 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
73a0b7ad3c
commit
891b9f6a20
|
@ -609,18 +609,18 @@ public class ZKAssign {
|
|||
}
|
||||
|
||||
/**
|
||||
* Retransitions an existing unassigned node for the specified region which is
|
||||
* currently in the OPENING state to be in the OPENING state.
|
||||
* Confirm an existing unassigned node for the specified region which is
|
||||
* currently in the OPENING state to be still in the OPENING state on
|
||||
* the specified server.
|
||||
*
|
||||
* <p>Does not transition nodes from other states. If for some reason the
|
||||
* node could not be transitioned, the method returns -1. If the transition
|
||||
* is successful, the version of the node rewritten as OPENING is returned.
|
||||
* <p>If for some reason the check fails, the method returns -1. Otherwise,
|
||||
* the version of the node (same as the expected version) is returned.
|
||||
*
|
||||
* <p>This method can fail and return -1 for three different reasons:
|
||||
* <ul><li>Unassigned node for this region does not exist</li>
|
||||
* <li>Unassigned node for this region is not in OPENING state</li>
|
||||
* <li>After verifying OPENING state, update fails because of wrong version
|
||||
* (someone else already transitioned the node)</li>
|
||||
* <li>After verifying OPENING state, the server name or the version of the
|
||||
* doesn't match)</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>Does not set any watches.
|
||||
|
@ -631,12 +631,11 @@ public class ZKAssign {
|
|||
* @param zkw zk reference
|
||||
* @param region region to be transitioned to opening
|
||||
* @param serverName server transition happens on
|
||||
* @param updateZNode write the znode. If false, we only check.
|
||||
* @return version of node after transition, -1 if unsuccessful transition
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
*/
|
||||
public static int retransitionNodeOpening(ZooKeeperWatcher zkw,
|
||||
HRegionInfo region, ServerName serverName, int expectedVersion, boolean updateZNode)
|
||||
public static int confirmNodeOpening(ZooKeeperWatcher zkw,
|
||||
HRegionInfo region, ServerName serverName, int expectedVersion)
|
||||
throws KeeperException {
|
||||
|
||||
String encoded = region.getEncodedName();
|
||||
|
@ -677,33 +676,7 @@ public class ZKAssign {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// We don't have to write the new state: the check is complete.
|
||||
if (!updateZNode){
|
||||
return expectedVersion;
|
||||
}
|
||||
|
||||
// Write new data, ensuring data has not changed since we last read it
|
||||
try {
|
||||
rt = RegionTransition.createRegionTransition(
|
||||
EventType.RS_ZK_REGION_OPENING, region.getRegionName(), serverName, null);
|
||||
if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) {
|
||||
LOG.warn(zkw.prefix("Attempt to retransition the opening state of the " +
|
||||
"unassigned node for " + encoded + " failed, " +
|
||||
"the node existed and was in the expected state but then when " +
|
||||
"setting data we got a version mismatch"));
|
||||
return -1;
|
||||
}
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug(zkw.prefix("Retransition opening state of node " + encoded));
|
||||
}
|
||||
return stat.getVersion() + 1;
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
LOG.warn(zkw.prefix("Attempt to retransition the opening state of the " +
|
||||
"unassigned node for " + encoded + " failed, " +
|
||||
"the node existed and was in the expected state but then when " +
|
||||
"setting data it no longer existed"));
|
||||
return -1;
|
||||
}
|
||||
return expectedVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.NavigableMap;
|
|||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -44,7 +43,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
|||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
|
@ -95,7 +92,6 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
|
|||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.LinkedHashMultimap;
|
||||
|
||||
/**
|
||||
|
@ -112,14 +108,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
|
||||
-1, -1L);
|
||||
|
||||
public static final String ASSIGNMENT_TIMEOUT = "hbase.master.assignment.timeoutmonitor.timeout";
|
||||
public static final int DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT = 600000;
|
||||
public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
|
||||
public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
|
||||
|
||||
public static final String ALREADY_IN_TRANSITION_WAITTIME
|
||||
static final String ALREADY_IN_TRANSITION_WAITTIME
|
||||
= "hbase.assignment.already.intransition.waittime";
|
||||
public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
|
||||
static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
|
||||
|
||||
protected final Server server;
|
||||
|
||||
|
@ -129,10 +120,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private CatalogTracker catalogTracker;
|
||||
|
||||
protected final TimeoutMonitor timeoutMonitor;
|
||||
|
||||
private final TimerUpdater timerUpdater;
|
||||
|
||||
private LoadBalancer balancer;
|
||||
|
||||
private final MetricsAssignmentManager metricsAssignmentManager;
|
||||
|
@ -176,12 +163,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private final ZKTable zkTable;
|
||||
|
||||
/**
|
||||
* Contains the server which need to update timer, these servers will be
|
||||
* handled by {@link TimerUpdater}
|
||||
*/
|
||||
private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
// For unit tests, keep track of calls to ClosedRegionHandler
|
||||
|
@ -223,9 +204,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
*/
|
||||
protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
|
||||
|
||||
/** Is the TimeOutManagement activated **/
|
||||
private final boolean tomActivated;
|
||||
|
||||
/**
|
||||
* A map to track the count a region fails to open in a row.
|
||||
* So that we don't try to open a region forever if the failure is
|
||||
|
@ -268,23 +246,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
|
||||
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
|
||||
FavoredNodeLoadBalancer.class);
|
||||
this.tomActivated = conf.getBoolean(
|
||||
ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
|
||||
if (tomActivated){
|
||||
this.serversInUpdatingTimer = new ConcurrentSkipListSet<ServerName>();
|
||||
this.timeoutMonitor = new TimeoutMonitor(
|
||||
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
|
||||
server, serverManager,
|
||||
conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT));
|
||||
this.timerUpdater = new TimerUpdater(conf.getInt(
|
||||
"hbase.master.assignment.timerupdater.period", 10000), server);
|
||||
Threads.setDaemonThreadRunning(timerUpdater.getThread(),
|
||||
server.getServerName() + ".timerUpdater");
|
||||
} else {
|
||||
this.serversInUpdatingTimer = null;
|
||||
this.timeoutMonitor = null;
|
||||
this.timerUpdater = null;
|
||||
}
|
||||
try {
|
||||
this.zkTable = new ZKTable(this.watcher);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -315,13 +276,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
this.metricsAssignmentManager = new MetricsAssignmentManager();
|
||||
}
|
||||
|
||||
void startTimeOutMonitor() {
|
||||
if (tomActivated) {
|
||||
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
|
||||
+ ".timeoutMonitor");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Instance of ZKTable.
|
||||
*/
|
||||
|
@ -1279,8 +1233,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
// Remove plan if one.
|
||||
clearRegionPlan(regionInfo);
|
||||
// Add the server to serversInUpdatingTimer
|
||||
addToServersInUpdatingTimer(sn);
|
||||
balancer.regionOnline(regionInfo, sn);
|
||||
}
|
||||
|
||||
|
@ -1321,53 +1273,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
|
||||
* will update timers for this server in background
|
||||
* @param sn
|
||||
*/
|
||||
private void addToServersInUpdatingTimer(final ServerName sn) {
|
||||
if (tomActivated){
|
||||
this.serversInUpdatingTimer.add(sn);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Touch timers for all regions in transition that have the passed
|
||||
* <code>sn</code> in common.
|
||||
* Call this method whenever a server checks in. Doing so helps the case where
|
||||
* a new regionserver has joined the cluster and its been given 1k regions to
|
||||
* open. If this method is tickled every time the region reports in a
|
||||
* successful open then the 1k-th region won't be timed out just because its
|
||||
* sitting behind the open of 999 other regions. This method is NOT used
|
||||
* as part of bulk assign -- there we have a different mechanism for extending
|
||||
* the regions in transition timer (we turn it off temporarily -- because
|
||||
* there is no regionplan involved when bulk assigning.
|
||||
* @param sn
|
||||
*/
|
||||
private void updateTimers(final ServerName sn) {
|
||||
Preconditions.checkState(tomActivated);
|
||||
if (sn == null) return;
|
||||
|
||||
// This loop could be expensive.
|
||||
// First make a copy of current regionPlan rather than hold sync while
|
||||
// looping because holding sync can cause deadlock. Its ok in this loop
|
||||
// if the Map we're going against is a little stale
|
||||
List<Map.Entry<String, RegionPlan>> rps;
|
||||
synchronized(this.regionPlans) {
|
||||
rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
|
||||
}
|
||||
|
||||
for (Map.Entry<String, RegionPlan> e : rps) {
|
||||
if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
|
||||
RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
|
||||
if (regionState != null) {
|
||||
regionState.updateTimestampToNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the region as offline. Removes it from regions in transition and
|
||||
* removes in-memory assignment information.
|
||||
|
@ -1745,7 +1650,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.warn("Failed to unassign "
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
if (!tomActivated && state != null) {
|
||||
if (state != null) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
}
|
||||
return;
|
||||
|
@ -1761,7 +1666,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
// Run out of attempts
|
||||
if (!tomActivated && state != null) {
|
||||
if (state != null) {
|
||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||
}
|
||||
}
|
||||
|
@ -1886,21 +1791,17 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
if (plan == null) {
|
||||
LOG.warn("Unable to determine a plan to assign " + region);
|
||||
if (tomActivated){
|
||||
this.timeoutMonitor.setAllRegionServersOffline(true);
|
||||
} else {
|
||||
if (region.isMetaRegion()) {
|
||||
try {
|
||||
Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
|
||||
if (i == maximumAttempts) i = 1;
|
||||
continue;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Got exception while waiting for hbase:meta assignment");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (region.isMetaRegion()) {
|
||||
try {
|
||||
Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
|
||||
if (i == maximumAttempts) i = 1;
|
||||
continue;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Got exception while waiting for hbase:meta assignment");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
}
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
return;
|
||||
}
|
||||
if (setOfflineInZK && versionOfOfflineNode == -1) {
|
||||
|
@ -2015,10 +1916,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Failed to assign "
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
Thread.currentThread().interrupt();
|
||||
if (!tomActivated) {
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} else if (retry) {
|
||||
|
@ -2053,11 +1952,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.warn("Failed to get region plan", e);
|
||||
}
|
||||
if (newPlan == null) {
|
||||
if (tomActivated) {
|
||||
this.timeoutMonitor.setAllRegionServersOffline(true);
|
||||
} else {
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
}
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
LOG.warn("Unable to find a viable location to assign region " +
|
||||
region.getRegionNameAsString());
|
||||
return;
|
||||
|
@ -2080,19 +1975,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Failed to assign "
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
Thread.currentThread().interrupt();
|
||||
if (!tomActivated) {
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Run out of attempts
|
||||
if (!tomActivated) {
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
}
|
||||
regionStates.updateRegionState(region, State.FAILED_OPEN);
|
||||
} finally {
|
||||
metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
@ -2893,191 +2784,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update timers for all regions in transition going against the server in the
|
||||
* serversInUpdatingTimer.
|
||||
*/
|
||||
public class TimerUpdater extends Chore {
|
||||
|
||||
public TimerUpdater(final int period, final Stoppable stopper) {
|
||||
super("AssignmentTimerUpdater", period, stopper);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
Preconditions.checkState(tomActivated);
|
||||
ServerName serverToUpdateTimer = null;
|
||||
while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
|
||||
if (serverToUpdateTimer == null) {
|
||||
serverToUpdateTimer = serversInUpdatingTimer.first();
|
||||
} else {
|
||||
serverToUpdateTimer = serversInUpdatingTimer
|
||||
.higher(serverToUpdateTimer);
|
||||
}
|
||||
if (serverToUpdateTimer == null) {
|
||||
break;
|
||||
}
|
||||
updateTimers(serverToUpdateTimer);
|
||||
serversInUpdatingTimer.remove(serverToUpdateTimer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitor to check for time outs on region transition operations
|
||||
*/
|
||||
public class TimeoutMonitor extends Chore {
|
||||
private boolean allRegionServersOffline = false;
|
||||
private ServerManager serverManager;
|
||||
private final int timeout;
|
||||
|
||||
/**
|
||||
* Creates a periodic monitor to check for time outs on region transition
|
||||
* operations. This will deal with retries if for some reason something
|
||||
* doesn't happen within the specified timeout.
|
||||
* @param period
|
||||
* @param stopper When {@link Stoppable#isStopped()} is true, this thread will
|
||||
* cleanup and exit cleanly.
|
||||
* @param timeout
|
||||
*/
|
||||
public TimeoutMonitor(final int period, final Stoppable stopper,
|
||||
ServerManager serverManager,
|
||||
final int timeout) {
|
||||
super("AssignmentTimeoutMonitor", period, stopper);
|
||||
this.timeout = timeout;
|
||||
this.serverManager = serverManager;
|
||||
}
|
||||
|
||||
private synchronized void setAllRegionServersOffline(
|
||||
boolean allRegionServersOffline) {
|
||||
this.allRegionServersOffline = allRegionServersOffline;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
Preconditions.checkState(tomActivated);
|
||||
boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
|
||||
|
||||
// Iterate all regions in transition checking for time outs
|
||||
long now = System.currentTimeMillis();
|
||||
// no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
|
||||
// a copy while another thread is adding/removing items
|
||||
for (String regionName : regionStates.getRegionsInTransition().keySet()) {
|
||||
RegionState regionState = regionStates.getRegionTransitionState(regionName);
|
||||
if (regionState == null) continue;
|
||||
|
||||
if (regionState.getStamp() + timeout <= now) {
|
||||
// decide on action upon timeout
|
||||
actOnTimeOut(regionState);
|
||||
} else if (this.allRegionServersOffline && !noRSAvailable) {
|
||||
RegionPlan existingPlan = regionPlans.get(regionName);
|
||||
if (existingPlan == null
|
||||
|| !this.serverManager.isServerOnline(existingPlan
|
||||
.getDestination())) {
|
||||
// if some RSs just came back online, we can start the assignment
|
||||
// right away
|
||||
actOnTimeOut(regionState);
|
||||
}
|
||||
}
|
||||
}
|
||||
setAllRegionServersOffline(noRSAvailable);
|
||||
}
|
||||
|
||||
private void actOnTimeOut(RegionState regionState) {
|
||||
HRegionInfo regionInfo = regionState.getRegion();
|
||||
LOG.info("Regions in transition timed out: " + regionState);
|
||||
// Expired! Do a retry.
|
||||
switch (regionState.getState()) {
|
||||
case CLOSED:
|
||||
LOG.info("Region " + regionInfo.getEncodedName()
|
||||
+ " has been CLOSED for too long, waiting on queued "
|
||||
+ "ClosedRegionHandler to run or server shutdown");
|
||||
// Update our timestamp.
|
||||
regionState.updateTimestampToNow();
|
||||
break;
|
||||
case OFFLINE:
|
||||
LOG.info("Region has been OFFLINE for too long, " + "reassigning "
|
||||
+ regionInfo.getRegionNameAsString() + " to a random server");
|
||||
invokeAssign(regionInfo);
|
||||
break;
|
||||
case PENDING_OPEN:
|
||||
LOG.info("Region has been PENDING_OPEN for too "
|
||||
+ "long, reassigning region=" + regionInfo.getRegionNameAsString());
|
||||
invokeAssign(regionInfo);
|
||||
break;
|
||||
case OPENING:
|
||||
processOpeningState(regionInfo);
|
||||
break;
|
||||
case OPEN:
|
||||
LOG.error("Region has been OPEN for too long, " +
|
||||
"we don't know where region was opened so can't do anything");
|
||||
regionState.updateTimestampToNow();
|
||||
break;
|
||||
|
||||
case PENDING_CLOSE:
|
||||
LOG.info("Region has been PENDING_CLOSE for too "
|
||||
+ "long, running forced unassign again on region="
|
||||
+ regionInfo.getRegionNameAsString());
|
||||
invokeUnassign(regionInfo);
|
||||
break;
|
||||
case CLOSING:
|
||||
LOG.info("Region has been CLOSING for too " +
|
||||
"long, this should eventually complete or the server will " +
|
||||
"expire, send RPC again");
|
||||
invokeUnassign(regionInfo);
|
||||
break;
|
||||
|
||||
case SPLIT:
|
||||
case SPLITTING:
|
||||
case FAILED_OPEN:
|
||||
case FAILED_CLOSE:
|
||||
case MERGING:
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException("Received event is not valid.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processOpeningState(HRegionInfo regionInfo) {
|
||||
LOG.info("Region has been OPENING for too long, reassigning region="
|
||||
+ regionInfo.getRegionNameAsString());
|
||||
// Should have a ZK node in OPENING state
|
||||
try {
|
||||
String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
|
||||
Stat stat = new Stat();
|
||||
byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
|
||||
if (data == null) {
|
||||
LOG.warn("Data is null, node " + node + " no longer exists");
|
||||
return;
|
||||
}
|
||||
RegionTransition rt = RegionTransition.parseFrom(data);
|
||||
EventType et = rt.getEventType();
|
||||
if (et == EventType.RS_ZK_REGION_OPENED) {
|
||||
LOG.debug("Region has transitioned to OPENED, allowing "
|
||||
+ "watched event handlers to process");
|
||||
return;
|
||||
} else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
|
||||
LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
|
||||
return;
|
||||
}
|
||||
invokeAssign(regionInfo);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.error("Unexpected exception parsing CLOSING region", e);
|
||||
}
|
||||
}
|
||||
|
||||
void invokeAssign(HRegionInfo regionInfo) {
|
||||
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
|
||||
}
|
||||
|
||||
private void invokeUnassign(HRegionInfo regionInfo) {
|
||||
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
|
||||
}
|
||||
|
||||
public boolean isCarryingMeta(ServerName serverName) {
|
||||
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
|
||||
}
|
||||
|
@ -3214,10 +2924,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
public void stop() {
|
||||
shutdown(); // Stop executor service, etc
|
||||
if (tomActivated){
|
||||
this.timeoutMonitor.interrupt();
|
||||
this.timerUpdater.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -503,8 +503,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
}
|
||||
}
|
||||
|
||||
this.assignmentManager.startTimeOutMonitor();
|
||||
|
||||
// get a list for previously failed RS which need log splitting work
|
||||
// we recover hbase:meta region servers inside master initialization and
|
||||
// handle other failed servers in SSH in order to start up master node ASAP
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
||||
/**
|
||||
* A callable object that invokes the corresponding action that needs to be
|
||||
* taken for unassignment of a region in transition. Implementing as future
|
||||
* callable we are able to act on the timeout asynchronously.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class UnAssignCallable implements Callable<Object> {
|
||||
private AssignmentManager assignmentManager;
|
||||
|
||||
private HRegionInfo hri;
|
||||
|
||||
public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
|
||||
this.assignmentManager = assignmentManager;
|
||||
this.hri = hri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
assignmentManager.unassign(hri, true);
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -111,7 +111,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||
|
@ -3159,9 +3158,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
|
||||
2000);
|
||||
// How often to send a progress report (default 1/2 master timeout)
|
||||
int period = this.conf.getInt("hbase.hstore.report.period",
|
||||
this.conf.getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
|
||||
AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT) / 2);
|
||||
int period = this.conf.getInt("hbase.hstore.report.period", 300000);
|
||||
long lastReport = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
||||
while ((entry = reader.next()) != null) {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
|
@ -51,9 +50,6 @@ public class OpenRegionHandler extends EventHandler {
|
|||
private final HRegionInfo regionInfo;
|
||||
private final HTableDescriptor htd;
|
||||
|
||||
private boolean tomActivated;
|
||||
private int assignmentTimeout;
|
||||
|
||||
// We get version of our znode at start of open process and monitor it across
|
||||
// the total open. We'll fail the open if someone hijacks our znode; we can
|
||||
// tell this has happened if version is not as expected.
|
||||
|
@ -82,12 +78,6 @@ public class OpenRegionHandler extends EventHandler {
|
|||
this.regionInfo = regionInfo;
|
||||
this.htd = htd;
|
||||
this.versionOfOfflineNode = versionOfOfflineNode;
|
||||
tomActivated = this.server.getConfiguration().
|
||||
getBoolean(AssignmentManager.ASSIGNMENT_TIMEOUT_MANAGEMENT,
|
||||
AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
|
||||
assignmentTimeout = this.server.getConfiguration().
|
||||
getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
|
||||
AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT);
|
||||
}
|
||||
|
||||
public HRegionInfo getRegionInfo() {
|
||||
|
@ -246,27 +236,27 @@ public class OpenRegionHandler extends EventHandler {
|
|||
PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
|
||||
this.server, this.rsServices, signaller);
|
||||
t.start();
|
||||
// Total timeout for meta edit. If we fail adding the edit then close out
|
||||
// the region and let it be assigned elsewhere.
|
||||
long timeout = assignmentTimeout * 10;
|
||||
// Post open deploy task:
|
||||
// meta => update meta location in ZK
|
||||
// other region => update meta
|
||||
// It could fail if ZK/meta is not available and
|
||||
// the update runs out of retries.
|
||||
long now = System.currentTimeMillis();
|
||||
long endTime = now + timeout;
|
||||
// Let our period at which we update OPENING state to be be 1/3rd of the
|
||||
// regions-in-transition timeout period.
|
||||
long period = Math.max(1, assignmentTimeout/ 3);
|
||||
long lastUpdate = now;
|
||||
boolean tickleOpening = true;
|
||||
while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
|
||||
!this.rsServices.isStopping() && (endTime > now)) {
|
||||
!this.rsServices.isStopping() && isRegionStillOpening()) {
|
||||
long elapsed = now - lastUpdate;
|
||||
if (elapsed > period) {
|
||||
if (elapsed > 120000) { // 2 minutes, no need to tickleOpening too often
|
||||
// Only tickle OPENING if postOpenDeployTasks is taking some time.
|
||||
lastUpdate = now;
|
||||
tickleOpening = tickleOpening("post_open_deploy");
|
||||
}
|
||||
synchronized (signaller) {
|
||||
try {
|
||||
signaller.wait(period);
|
||||
// Wait for 10 seconds, so that server shutdown
|
||||
// won't take too long if this thread happens to run.
|
||||
signaller.wait(10000);
|
||||
} catch (InterruptedException e) {
|
||||
// Go to the loop check.
|
||||
}
|
||||
|
@ -304,7 +294,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
* .
|
||||
*/
|
||||
static class PostOpenDeployTasksThread extends Thread {
|
||||
private Exception exception = null;
|
||||
private Throwable exception = null;
|
||||
private final Server server;
|
||||
private final RegionServerServices services;
|
||||
private final HRegion region;
|
||||
|
@ -327,7 +317,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
} catch (KeeperException e) {
|
||||
server.abort("Exception running postOpenDeployTasks; region=" +
|
||||
this.region.getRegionInfo().getEncodedName(), e);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Exception running postOpenDeployTasks; region=" +
|
||||
this.region.getRegionInfo().getEncodedName(), e);
|
||||
this.exception = e;
|
||||
|
@ -342,7 +332,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
/**
|
||||
* @return Null or the run exception; call this method after thread is done.
|
||||
*/
|
||||
Exception getException() {
|
||||
Throwable getException() {
|
||||
return this.exception;
|
||||
}
|
||||
}
|
||||
|
@ -552,8 +542,8 @@ public class OpenRegionHandler extends EventHandler {
|
|||
String encodedName = this.regionInfo.getEncodedName();
|
||||
try {
|
||||
this.version =
|
||||
ZKAssign.retransitionNodeOpening(server.getZooKeeper(),
|
||||
this.regionInfo, this.server.getServerName(), this.version, tomActivated);
|
||||
ZKAssign.confirmNodeOpening(server.getZooKeeper(),
|
||||
this.regionInfo, this.server.getServerName(), this.version);
|
||||
} catch (KeeperException e) {
|
||||
server.abort("Exception refreshing OPENING; region=" + encodedName +
|
||||
", context=" + context, e);
|
||||
|
|
|
@ -61,8 +61,6 @@ public class TestGlobalMemStoreSize {
|
|||
// Start the cluster
|
||||
LOG.info("Starting cluster");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
|
||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniCluster(1, regionServerNum);
|
||||
cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
|
|
@ -57,8 +57,6 @@ public class TestMasterRestartAfterDisablingTable {
|
|||
// Start the cluster
|
||||
log("Starting cluster");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
|
|
@ -81,8 +81,6 @@ public class TestOpenedRegionHandler {
|
|||
log("Starting cluster");
|
||||
conf = HBaseConfiguration.create();
|
||||
resetConf = conf;
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
|
||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
String tableName = "testOpenedRegionHandlerOnMasterRestart";
|
||||
|
|
|
@ -66,8 +66,6 @@ public class TestRollingRestart {
|
|||
// Start the cluster
|
||||
log("Starting cluster");
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
|
|
@ -122,10 +122,6 @@ public class TestSplitTransactionOnCluster {
|
|||
|
||||
@BeforeClass public static void before() throws Exception {
|
||||
TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000);
|
||||
// Needed because some tests have splits happening on RS that are killed
|
||||
// We don't want to wait 3min for the master to figure it out
|
||||
TESTING_UTIL.getConfiguration().setInt(
|
||||
"hbase.master.assignment.timeoutmonitor.timeout", 4000);
|
||||
TESTING_UTIL.startMiniCluster(NB_SERVERS);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue