HBASE-3019 Make bulk assignment on cluster startup run faster

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1003330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-09-30 23:12:56 +00:00
parent 78f75e6c7f
commit 94682d09d3
7 changed files with 245 additions and 110 deletions

View File

@ -958,6 +958,7 @@ Release 0.21.0 - Unreleased
(dhruba borthakur via Stack) (dhruba borthakur via Stack)
HBASE-2646 Compaction requests should be prioritized to prevent blocking HBASE-2646 Compaction requests should be prioritized to prevent blocking
(Jeff Whiting via Stack) (Jeff Whiting via Stack)
HBASE-3019 Make bulk assignment on cluster startup run faster
NEW FEATURES NEW FEATURES
HBASE-1961 HBase EC2 scripts HBASE-1961 HBase EC2 scripts

View File

@ -302,6 +302,12 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
*/ */
public void openRegion(final HRegionInfo region); public void openRegion(final HRegionInfo region);
/**
* Opens the specified regions.
* @param regions regions to open
*/
public void openRegions(final List<HRegionInfo> regions);
/** /**
* Closes the specified region. * Closes the specified region.
* @param region region to close * @param region region to close

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.master;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -33,6 +34,7 @@ import java.util.NavigableMap;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -65,6 +67,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Manages and performs region assignment. * Manages and performs region assignment.
* <p> * <p>
@ -477,7 +481,8 @@ public class AssignmentManager extends ZooKeeperListener {
this.regionsInTransition.remove(regionInfo.getEncodedName()); this.regionsInTransition.remove(regionInfo.getEncodedName());
if (rs != null) { if (rs != null) {
this.regionsInTransition.notifyAll(); this.regionsInTransition.notifyAll();
LOG.warn("Asked online a region that was already in " + } else {
LOG.warn("Asked online a region that was not in " +
"regionsInTransition: " + rs); "regionsInTransition: " + rs);
} }
} }
@ -550,68 +555,85 @@ public class AssignmentManager extends ZooKeeperListener {
* @param regionName server to be assigned * @param regionName server to be assigned
*/ */
public void assign(HRegionInfo region) { public void assign(HRegionInfo region) {
// Grab the state of this region and synchronize on it RegionState state = addToRegionsInTransition(region);
String encodedName = region.getEncodedName();
RegionState state;
synchronized (regionsInTransition) {
state = regionsInTransition.get(encodedName);
if (state == null) {
state = new RegionState(region, RegionState.State.OFFLINE);
regionsInTransition.put(encodedName, state);
}
}
// This here gap between synchronizations looks like a hole but it should
// be ok because the assign below would protect against being called with
// a state instance that is not in the right 'state' -- St.Ack 20100920.
synchronized (state) { synchronized (state) {
assign(state); assign(state);
} }
} }
/**
* Bulk assign regions to <code>destination</code>. If we fail in any way,
* we'll abort the server.
* @param destination
* @param regions Regions to assign.
*/
public void assign(final HServerInfo destination,
final List<HRegionInfo> regions) {
LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
destination.getServerName());
List<RegionState> states = new ArrayList<RegionState>(regions.size());
synchronized (this.regionsInTransition) {
for (HRegionInfo region: regions) {
states.add(forceRegionStateToOffline(region));
}
}
// Presumption is that only this thread will be updating the state at this
// time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
for (RegionState state: states) {
if (!setOfflineInZooKeeper(state)) {
return;
}
}
for (RegionState state: states) {
// Transition RegionState to PENDING_OPEN here in master; means we've
// sent the open. We're a little ahead of ourselves here since we've not
// yet sent out the actual open but putting this state change after the
// call to open risks our writing PENDING_OPEN after state has been moved
// to OPENING by the regionserver.
state.update(RegionState.State.PENDING_OPEN);
}
try {
// Send OPEN RPC. This can fail if the server on other end is is not up.
this.serverManager.sendRegionOpen(destination, regions);
} catch (Throwable t) {
this.master.abort("Failed assignment of regions to " + destination, t);
return;
}
LOG.debug("Bulk assigning done for " + destination.getServerName());
}
private RegionState addToRegionsInTransition(final HRegionInfo region) {
synchronized (regionsInTransition) {
return forceRegionStateToOffline(region);
}
}
/**
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
* Caller must hold lock on this.regionsInTransition.
* @param region
* @return Amended RegionState.
*/
private RegionState forceRegionStateToOffline(final HRegionInfo region) {
String encodedName = region.getEncodedName();
RegionState state = this.regionsInTransition.get(encodedName);
if (state == null) {
state = new RegionState(region, RegionState.State.OFFLINE);
this.regionsInTransition.put(encodedName, state);
}
return state;
}
/** /**
* Caller must hold lock on the passed <code>state</code> object. * Caller must hold lock on the passed <code>state</code> object.
* @param state * @param state
*/ */
private void assign(final RegionState state) { private void assign(final RegionState state) {
if (!state.isClosed() && !state.isOffline()) { if (!setOfflineInZooKeeper(state)) return;
LOG.info("Attempting to assign region but it is in transition and in " + RegionPlan plan = getRegionPlan(state);
"an unexpected state:" + state);
return;
} else {
state.update(RegionState.State.OFFLINE);
}
try { try {
if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
state.getRegion(), master.getServerName())) { " to " + plan.getDestination().getServerName());
LOG.warn("Attempted to create/force node into OFFLINE state before " +
"completing assignment but failed to do so");
return;
}
} catch (KeeperException e) {
master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
return;
}
// Pickup existing plan or make a new one
String encodedName = state.getRegion().getEncodedName();
RegionPlan plan;
synchronized(regionPlans) {
plan = regionPlans.get(encodedName);
if (plan == null) {
LOG.debug("No previous transition plan for " +
state.getRegion().getRegionNameAsString() +
" so generating a random one; " + serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() + ") available servers");
plan = new RegionPlan(state.getRegion(), null,
LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
regionPlans.put(encodedName, plan);
} else {
LOG.debug("Using preexisting plan=" + plan);
}
}
try {
LOG.debug("Assigning region " +
state.getRegion().getRegionNameAsString() + " to " +
plan.getDestination().getServerName());
// Send OPEN RPC. This can fail if the server on other end is is not up. // Send OPEN RPC. This can fail if the server on other end is is not up.
serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
// Transition RegionState to PENDING_OPEN // Transition RegionState to PENDING_OPEN
@ -623,11 +645,61 @@ public class AssignmentManager extends ZooKeeperListener {
// Clean out plan we failed execute and one that doesn't look like it'll // Clean out plan we failed execute and one that doesn't look like it'll
// succeed anyways; we need a new plan! // succeed anyways; we need a new plan!
synchronized(regionPlans) { synchronized(regionPlans) {
this.regionPlans.remove(encodedName); this.regionPlans.remove(state.getRegion().getEncodedName());
} }
} }
} }
/**
* Set region as OFFLINED up in zookeeper
* @param state
* @return True if we succeeded, false otherwise (State was incorrect or failed
* updating zk).
*/
boolean setOfflineInZooKeeper(final RegionState state) {
if (!state.isClosed() && !state.isOffline()) {
new RuntimeException("Unexpected state trying to OFFLINE; " + state);
this.master.abort("Unexpected state trying to OFFLINE; " + state,
new IllegalStateException());
return false;
} else {
state.update(RegionState.State.OFFLINE);
}
try {
if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
state.getRegion(), master.getServerName())) {
LOG.warn("Attempted to create/force node into OFFLINE state before " +
"completing assignment but failed to do so for " + state);
return false;
}
} catch (KeeperException e) {
master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
return false;
}
return true;
}
RegionPlan getRegionPlan(final RegionState state) {
// Pickup existing plan or make a new one
String encodedName = state.getRegion().getEncodedName();
RegionPlan plan;
synchronized (regionPlans) {
plan = regionPlans.get(encodedName);
if (plan == null) {
LOG.debug("No previous transition plan for " +
state.getRegion().getRegionNameAsString() +
" so generating a random one; " + serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() + ") available servers");
plan = new RegionPlan(state.getRegion(), null,
LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
regionPlans.put(encodedName, plan);
} else {
LOG.debug("Using preexisting plan=" + plan);
}
}
return plan;
}
/** /**
* Unassigns the specified region. * Unassigns the specified region.
* <p> * <p>
@ -736,66 +808,71 @@ public class AssignmentManager extends ZooKeeperListener {
// Get all available servers // Get all available servers
List<HServerInfo> servers = serverManager.getOnlineServersList(); List<HServerInfo> servers = serverManager.getOnlineServersList();
LOG.info("Assigning " + allRegions.size() + " region(s) across " + LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
servers.size() + " server(s)"); servers.size() + " server(s)");
// Generate a cluster startup region placement plan // Generate a cluster startup region placement plan
Map<HServerInfo, List<HRegionInfo>> bulkPlan = Map<HServerInfo, List<HRegionInfo>> bulkPlan =
LoadBalancer.bulkAssignment(allRegions, servers); LoadBalancer.bulkAssignment(allRegions, servers);
// Now start a thread per server to run assignment. // Make a fixed thread count pool to run bulk assignments. Thought is that
for (Map.Entry<HServerInfo,List<HRegionInfo>> entry: bulkPlan.entrySet()) { // if a 1k cluster, running 1k bulk concurrent assignment threads will kill
Thread t = new BulkAssignServer(entry.getKey(), entry.getValue(), this.master); // master, HDFS or ZK?
t.start(); ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
} builder.setDaemon(true);
builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-%1$d");
// Wait for no regions to be in transition builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
// Abort if exception of any kind.
master.abort("Uncaught exception bulk assigning in " + t.getName(), e);
}
});
int threadCount =
this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 20);
java.util.concurrent.ExecutorService pool =
Executors.newFixedThreadPool(threadCount, builder.build());
// Disable timing out regions in transition up in zk while bulk assigning.
this.timeoutMonitor.bulkAssign(true);
try { try {
waitUntilNoRegionsInTransition(); for (Map.Entry<HServerInfo, List<HRegionInfo>> e: bulkPlan.entrySet()) {
} catch (InterruptedException e) { pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue()));
LOG.error("Interrupted waiting for regions to be assigned", e); }
throw new IOException(e); // Wait for no regions to be in transition
try {
// How long to wait on empty regions-in-transition. When we timeout,
// we'll put back in place the monitor of R-I-T. It should do fixup
// if server crashed during bulk assign, etc.
long timeout =
this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000);
waitUntilNoRegionsInTransition(timeout);
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for regions to be assigned", e);
throw new IOException(e);
}
} finally {
// We're done with the pool. It'll exit when its done all in queue.
pool.shutdown();
// Reenable timing out regions in transition up in zi.
this.timeoutMonitor.bulkAssign(false);
} }
LOG.info("Bulk assigning done");
LOG.info("All user regions have been assigned");
} }
/** /**
* Class to run bulk assign to a single server. * Manage bulk assigning to a server.
*/ */
class BulkAssignServer extends Thread { class SingleServerBulkAssigner implements Runnable {
private final HServerInfo regionserver;
private final List<HRegionInfo> regions; private final List<HRegionInfo> regions;
private final HServerInfo server; SingleServerBulkAssigner(final HServerInfo regionserver,
private final Stoppable stopper; final List<HRegionInfo> regions) {
this.regionserver = regionserver;
BulkAssignServer(final HServerInfo server,
final List<HRegionInfo> regions, final Stoppable stopper) {
super("serverassign-" + server.getServerName());
setDaemon(true);
this.server = server;
this.regions = regions; this.regions = regions;
this.stopper = stopper;
} }
@Override @Override
public void run() { public void run() {
// Insert a plan for each region with 'server' as the target regionserver. assign(this.regionserver, this.regions);
// Below, we run through regions one at a time. The call to assign will
// move the region into the regionsInTransition which starts up a timer.
// if the region is not out of the regionsInTransition by a certain time,
// it will be reassigned. We don't want that to happen. So, do it this
// way a region at a time for now. Presumably the regionserver will put
// up a back pressure if opening a region takes time which is good since
// this will block our adding new regions to regionsInTransition. Later
// make it so we can send over a lump of regions in one rpc with the
// regionserver on remote side tickling zk on a period to prevent our
// regionsInTransition timing out. Currently its not possible given the
// Executor architecture on the regionserver side. St.Ack 20100920.
for (HRegionInfo region : regions) {
regionPlans.put(region.getEncodedName(), new RegionPlan(region, null, server));
assign(region);
if (this.stopper.isStopped()) break;
}
} }
} }
@ -835,12 +912,18 @@ public class AssignmentManager extends ZooKeeperListener {
* that if it returns without an exception that there was a period of time * that if it returns without an exception that there was a period of time
* with no regions in transition from the point-of-view of the in-memory * with no regions in transition from the point-of-view of the in-memory
* state of the Master. * state of the Master.
* @param timeout How long to wait on empty regions-in-transition.
* @throws InterruptedException * @throws InterruptedException
*/ */
public void waitUntilNoRegionsInTransition() throws InterruptedException { public void waitUntilNoRegionsInTransition(final long timeout)
synchronized(regionsInTransition) { throws InterruptedException {
while(regionsInTransition.size() > 0) { long startTime = System.currentTimeMillis();
regionsInTransition.wait(); long remaining = timeout;
synchronized (this.regionsInTransition) {
while(this.regionsInTransition.size() > 0 &&
!this.master.isStopped() && remaining > 0) {
this.regionsInTransition.wait(remaining);
remaining = timeout - (System.currentTimeMillis() - startTime);
} }
} }
} }
@ -849,14 +932,18 @@ public class AssignmentManager extends ZooKeeperListener {
* @return A copy of the Map of regions currently in transition. * @return A copy of the Map of regions currently in transition.
*/ */
public NavigableMap<String, RegionState> getRegionsInTransition() { public NavigableMap<String, RegionState> getRegionsInTransition() {
return new TreeMap<String, RegionState>(this.regionsInTransition); synchronized (this.regionsInTransition) {
return new TreeMap<String, RegionState>(this.regionsInTransition);
}
} }
/** /**
* @return True if regions in transition. * @return True if regions in transition.
*/ */
public boolean isRegionsInTransition() { public boolean isRegionsInTransition() {
return !this.regionsInTransition.isEmpty(); synchronized (this.regionsInTransition) {
return !this.regionsInTransition.isEmpty();
}
} }
/** /**
@ -956,11 +1043,11 @@ public class AssignmentManager extends ZooKeeperListener {
} }
/** /**
* Unsets the specified table as disabled (enables it). * Monitor to check for time outs on region transition operations
*/ */
public class TimeoutMonitor extends Chore { public class TimeoutMonitor extends Chore {
private final int timeout; private final int timeout;
private boolean bulkAssign = false;
/** /**
* Creates a periodic monitor to check for time outs on region transition * Creates a periodic monitor to check for time outs on region transition
@ -977,8 +1064,21 @@ public class AssignmentManager extends ZooKeeperListener {
this.timeout = timeout; this.timeout = timeout;
} }
/**
* @param bulkAssign If true, we'll suspend checking regions in transition
* up in zookeeper. If false, will reenable check.
* @return Old setting for bulkAssign.
*/
public boolean bulkAssign(final boolean bulkAssign) {
boolean result = this.bulkAssign;
this.bulkAssign = bulkAssign;
return result;
}
@Override @Override
protected void chore() { protected void chore() {
// If bulkAssign in progress, suspend checks
if (this.bulkAssign) return;
synchronized (regionsInTransition) { synchronized (regionsInTransition) {
// Iterate all regions in transition checking for time outs // Iterate all regions in transition checking for time outs
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -1140,6 +1240,9 @@ public class AssignmentManager extends ZooKeeperListener {
unassign(plan.getRegionInfo()); unassign(plan.getRegionInfo());
} }
/**
* State of a Region while undergoing transitions.
*/
public static class RegionState implements Writable { public static class RegionState implements Writable {
private HRegionInfo region; private HRegionInfo region;

View File

@ -494,7 +494,7 @@ public class ServerManager {
* Open should not fail but can if server just crashed. * Open should not fail but can if server just crashed.
* <p> * <p>
* @param server server to open a region * @param server server to open a region
* @param regionName region to open * @param region region to open
*/ */
public void sendRegionOpen(HServerInfo server, HRegionInfo region) { public void sendRegionOpen(HServerInfo server, HRegionInfo region) {
HRegionInterface hri = getServerConnection(server); HRegionInterface hri = getServerConnection(server);
@ -506,6 +506,24 @@ public class ServerManager {
hri.openRegion(region); hri.openRegion(region);
} }
/**
* Sends an OPEN RPC to the specified server to open the specified region.
* <p>
* Open should not fail but can if server just crashed.
* <p>
* @param server server to open a region
* @param regions regions to open
*/
public void sendRegionOpen(HServerInfo server, List<HRegionInfo> regions) {
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
+ " failed because no RPC connection found to this server");
return;
}
hri.openRegions(regions);
}
/** /**
* Sends an CLOSE RPC to the specified server to close the specified region. * Sends an CLOSE RPC to the specified server to close the specified region.
* <p> * <p>

View File

@ -85,7 +85,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
@Override @Override
public void process() { public void process() {
LOG.debug("Handling OPENED event; deleting unassigned node"); LOG.debug("Handling OPENED event for " + this.regionInfo.getEncodedName() +
"; deleting unassigned node");
// TODO: should we check if this table was disabled and get it closed? // TODO: should we check if this table was disabled and get it closed?
// Remove region from in-memory transition and unassigned node from ZK // Remove region from in-memory transition and unassigned node from ZK
try { try {
@ -94,7 +95,7 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
} catch (KeeperException e) { } catch (KeeperException e) {
server.abort("Error deleting OPENED node in ZK", e); server.abort("Error deleting OPENED node in ZK", e);
} }
assignmentManager.regionOnline(regionInfo, serverInfo); this.assignmentManager.regionOnline(regionInfo, serverInfo);
LOG.debug("Opened region " + regionInfo.getRegionNameAsString()); LOG.debug("Opened region " + regionInfo.getRegionNameAsString());
} }
} }

View File

@ -1926,6 +1926,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
} }
} }
@Override
public void openRegions(List<HRegionInfo> regions) {
LOG.info("Received request to open " + regions.size() + " region(s)");
for (HRegionInfo region: regions) openRegion(region);
}
@Override @Override
public boolean closeRegion(HRegionInfo region) public boolean closeRegion(HRegionInfo region)
throws NotServingRegionException { throws NotServingRegionException {
@ -2453,5 +2459,4 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
new HRegionServerCommandLine(regionServerClass).doMain(args); new HRegionServerCommandLine(regionServerClass).doMain(args);
} }
} }

View File

@ -35,6 +35,7 @@ import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class TestLogsCleaner { public class TestLogsCleaner {
@ -79,7 +80,7 @@ public class TestLogsCleaner {
public void tearDown() throws Exception { public void tearDown() throws Exception {
} }
/* REENALBE -- DISABLED UNTIL REPLICATION BROUGHT UP TO NEW MASTER @Test*/ @Ignore @Test /* REENABLE -- DISABLED UNTIL REPLICATION BROUGHT UP TO NEW MASTER */
public void testLogCleaning() throws Exception{ public void testLogCleaning() throws Exception{
Configuration c = TEST_UTIL.getConfiguration(); Configuration c = TEST_UTIL.getConfiguration();
Path oldLogDir = new Path(HBaseTestingUtility.getTestDir(), Path oldLogDir = new Path(HBaseTestingUtility.getTestDir(),