HBASE-3112 Enable and disable of table needs a bit of loving in new master

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1033237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-11-09 21:28:09 +00:00
parent 022a7372d0
commit 5f2512facd
27 changed files with 1062 additions and 471 deletions

View File

@ -661,6 +661,7 @@ Release 0.90.0 - Unreleased
HBASE-3208 HLog.findMemstoresWithEditsOlderThan needs to look for edits
that are equal to too
HBASE-3141 Master RPC server needs to be started before an RS can check in
HBASE-3112 Enable and disable of table needs a bit of loving in new master
IMPROVEMENTS

View File

@ -137,6 +137,9 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
new HRegionInfo(1L, HTableDescriptor.META_TABLEDESC);
private byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
// This flag is in the parent of a split while the parent is still referenced
// by daughter regions. We USED to set this flag when we disabled a table
// but now table state is kept up in zookeeper as of 0.90.0 HBase.
private boolean offLine = false;
private long regionId = -1;
private transient byte [] regionName = HConstants.EMPTY_BYTE_ARRAY;
@ -533,7 +536,9 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
}
/**
* @param offLine set online - offline status
* The parent of a region split is offline while split daughters hold
* references to the parent. Offlined regions are closed.
* @param offLine Set online/offline status.
*/
public void setOffline(boolean offLine) {
this.offLine = offLine;

View File

@ -333,7 +333,7 @@ public class HBaseAdmin implements Abortable {
try {
getMaster().createTable(desc, splitKeys);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
throw e.unwrapRemoteException();
}
}
@ -427,37 +427,27 @@ public class HBaseAdmin implements Abortable {
LOG.info("Deleted " + Bytes.toString(tableName));
}
/**
* Brings a table on-line (enables it).
* Synchronous operation.
*
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
*/
public void enableTable(final String tableName) throws IOException {
public void enableTable(final String tableName)
throws IOException {
enableTable(Bytes.toBytes(tableName));
}
/**
* Brings a table on-line (enables it).
* Synchronous operation.
*
* Enable a table. May timeout. Use {@link #enableTableAsync(byte[])}
* and {@link #isTableEnabled(byte[])} instead.
* @param tableName name of the table
* @throws IOException if a remote or network exception occurs
* @see {@link #isTableEnabled(byte[])}
* @see {@link #disableTable(byte[])}
* @see {@link #enableTableAsync(byte[])}
*/
public void enableTable(final byte [] tableName) throws IOException {
isMasterRunning();
public void enableTable(final byte [] tableName)
throws IOException {
enableTableAsync(tableName);
// Wait until all regions are enabled
boolean enabled = false;
for (int tries = 0; tries < this.numRetries; tries++) {
try {
getMaster().enableTable(tableName);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
enabled = isTableEnabled(tableName);
if (enabled) {
break;
@ -470,11 +460,10 @@ public class HBaseAdmin implements Abortable {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
// continue
}
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for all regions to be enabled from " +
Bytes.toString(tableName));
Thread.currentThread().interrupt();
// Do this conversion rather than let it out because do not want to
// change the method signature.
throw new IOException("Interrupted", e);
}
}
if (!enabled) {
@ -485,52 +474,83 @@ public class HBaseAdmin implements Abortable {
}
/**
* Disables a table (takes it off-line) If it is being served, the master
* will tell the servers to stop serving it.
* Synchronous operation.
*
* Brings a table on-line (enables it). Method returns immediately though
* enable of table may take some time to complete, especially if the table
* is large (All regions are opened as part of enabling process). Check
* {@link #isTableEnabled(byte[])} to learn when table is fully online. If
* table is taking too long to online, check server logs.
* @param tableName
* @throws IOException
* @since 0.90.0
*/
public void enableTableAsync(final byte [] tableName)
throws IOException {
isMasterRunning();
try {
getMaster().enableTable(tableName);
} catch (RemoteException e) {
throw e.unwrapRemoteException();
}
LOG.info("Started enable of " + Bytes.toString(tableName));
}
/**
* Starts the disable of a table. If it is being served, the master
* will tell the servers to stop serving it. This method returns immediately.
* The disable of a table can take some time if the table is large (all
* regions are closed as part of table disable operation).
* Call {@link #isTableDisabled(byte[])} to check for when disable completes.
* If table is taking too long to online, check server logs.
* @param tableName name of table
* @throws IOException if a remote or network exception occurs
* @see {@link #isTableDisabled(byte[])}
* @see {@link #isTableEnabled(byte[])}
* @since 0.90.0
*/
public void disableTable(final String tableName) throws IOException {
public void disableTableAsync(final byte [] tableName) throws IOException {
isMasterRunning();
try {
getMaster().disableTable(tableName);
} catch (RemoteException e) {
throw e.unwrapRemoteException();
}
LOG.info("Started disable of " + Bytes.toString(tableName));
}
public void disableTable(final String tableName)
throws IOException {
disableTable(Bytes.toBytes(tableName));
}
/**
* Disables a table (takes it off-line) If it is being served, the master
* will tell the servers to stop serving it.
* Synchronous operation.
*
* @param tableName name of table
* @throws IOException if a remote or network exception occurs
* Disable table and wait on completion. May timeout. Use
* {@link #disableTableAsync(byte[])} and {@link #isTableDisabled(String)}
* instead.
* @param tableName
* @throws IOException
*/
public void disableTable(final byte [] tableName) throws IOException {
isMasterRunning();
// Wait until all regions are disabled
public void disableTable(final byte [] tableName)
throws IOException {
disableTableAsync(tableName);
// Wait until table is disabled
boolean disabled = false;
for (int tries = 0; tries < this.numRetries; tries++) {
try {
getMaster().disableTable(tableName);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
disabled = isTableDisabled(tableName);
if (disabled) {
break;
}
long sleep = getPauseTime(tries);
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep. Waiting for all regions to be disabled from " +
Bytes.toString(tableName));
LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " +
"disabled in " + Bytes.toString(tableName));
}
try {
Thread.sleep(getPauseTime(tries));
Thread.sleep(sleep);
} catch (InterruptedException e) {
// continue
}
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for all regions to be disabled from " +
Bytes.toString(tableName));
// Do this conversion rather than let it out because do not want to
// change the method signature.
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e);
}
}
if (!disabled) {
@ -557,6 +577,15 @@ public class HBaseAdmin implements Abortable {
return connection.isTableEnabled(tableName);
}
/**
* @param tableName name of table to check
* @return true if table is off-line
* @throws IOException if a remote or network exception occurs
*/
public boolean isTableDisabled(final String tableName) throws IOException {
return isTableDisabled(Bytes.toBytes(tableName));
}
/**
* @param tableName name of table to check
* @return true if table is off-line

View File

@ -62,7 +62,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@ -477,19 +477,20 @@ public class HConnectionManager {
/*
* @param True if table is online
*/
private boolean testTableOnlineState(byte[] tableName, boolean online)
private boolean testTableOnlineState(byte [] tableName, boolean online)
throws IOException {
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
// The root region is always enabled
return true;
return online;
}
String tableNameStr = Bytes.toString(tableName);
try {
List<String> tables = ZKTableDisable.getDisabledTables(this.zooKeeper);
String searchStr = Bytes.toString(tableName);
boolean disabled = tables.contains(searchStr);
return online? !disabled: disabled;
if (online) {
return ZKTable.isEnabledTable(this.zooKeeper, tableNameStr);
}
return ZKTable.isDisabledTable(this.zooKeeper, tableNameStr);
} catch (KeeperException e) {
throw new IOException("Failed listing disabled tables", e);
throw new IOException("Enable/Disable failed", e);
}
}

View File

@ -22,12 +22,9 @@ package org.apache.hadoop.hbase.master;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -38,7 +35,6 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -56,9 +52,9 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
@ -67,11 +63,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback;
@ -79,8 +75,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Manages and performs region assignment.
* <p>
@ -115,9 +109,7 @@ public class AssignmentManager extends ZooKeeperListener {
final ConcurrentNavigableMap<String, RegionPlan> regionPlans =
new ConcurrentSkipListMap<String, RegionPlan>();
/** Set of tables that have been disabled. */
private final Set<String> disabledTables =
Collections.synchronizedSet(new HashSet<String>());
private final ZKTable zkTable;
/**
* Server to regions assignment map.
@ -150,9 +142,11 @@ public class AssignmentManager extends ZooKeeperListener {
* @param serverManager
* @param catalogTracker
* @param service
* @throws KeeperException
*/
public AssignmentManager(Server master, ServerManager serverManager,
CatalogTracker catalogTracker, final ExecutorService service) {
CatalogTracker catalogTracker, final ExecutorService service)
throws KeeperException {
super(master.getZooKeeper());
this.master = master;
this.serverManager = serverManager;
@ -160,11 +154,21 @@ public class AssignmentManager extends ZooKeeperListener {
this.executorService = service;
Configuration conf = master.getConfiguration();
this.timeoutMonitor = new TimeoutMonitor(
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
master,
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 30000));
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
master,
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 30000));
Threads.setDaemonThreadRunning(timeoutMonitor,
master.getServerName() + ".timeoutMonitor");
master.getServerName() + ".timeoutMonitor");
this.zkTable = new ZKTable(this.master.getZooKeeper());
}
/**
* @return Instance of ZKTable.
*/
public ZKTable getZKTable() {
// These are 'expensive' to make involving trip to zk ensemble so allow
// sharing.
return this.zkTable;
}
/**
@ -200,8 +204,6 @@ public class AssignmentManager extends ZooKeeperListener {
// Returns servers who have not checked in (assumed dead) and their regions
Map<HServerInfo,List<Pair<HRegionInfo,Result>>> deadServers =
rebuildUserRegions();
// Pickup any disabled tables from ZK
rebuildDisabledTables();
// Process list of dead servers
processDeadServers(deadServers);
// Check existing regions in transition
@ -593,14 +595,10 @@ public class AssignmentManager extends ZooKeeperListener {
public void setOffline(HRegionInfo regionInfo) {
synchronized (this.regions) {
HServerInfo serverInfo = this.regions.remove(regionInfo);
if (serverInfo != null) {
List<HRegionInfo> serverRegions = this.servers.get(serverInfo);
if (!serverRegions.remove(regionInfo)) {
LOG.warn("Asked offline a region that was not on expected server: " +
regionInfo + ", " + serverInfo.getServerName());
}
} else {
LOG.warn("Asked offline a region that was not online: " + regionInfo);
if (serverInfo == null) return;
List<HRegionInfo> serverRegions = this.servers.get(serverInfo);
if (!serverRegions.remove(regionInfo)) {
LOG.warn("No " + regionInfo + " on " + serverInfo);
}
}
}
@ -651,9 +649,10 @@ public class AssignmentManager extends ZooKeeperListener {
public void assign(HRegionInfo region, boolean setOfflineInZK,
boolean forceNewPlan) {
String tableName = region.getTableDesc().getNameAsString();
if (isTableDisabled(tableName)) {
LOG.info("Table " + tableName + " disabled; skipping assign of " +
region.getRegionNameAsString());
boolean disabled = this.zkTable.isDisabledTable(tableName);
if (disabled || this.zkTable.isDisablingTable(tableName)) {
LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
" skipping assign of " + region.getRegionNameAsString());
offlineDisabledRegion(region);
return;
}
@ -674,7 +673,7 @@ public class AssignmentManager extends ZooKeeperListener {
* @param destination
* @param regions Regions to assign.
*/
public void assign(final HServerInfo destination,
void assign(final HServerInfo destination,
final List<HRegionInfo> regions) {
LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
destination.getServerName());
@ -958,7 +957,7 @@ public class AssignmentManager extends ZooKeeperListener {
* Updates the RegionState and sends the CLOSE RPC.
* <p>
* If a RegionPlan is already set, it will remain. If this is being used
* to disable a table, be sure to use {@link #disableTable(String)} to ensure
* to disable a table, be sure to use {@link #setDisabledTable(String)} to ensure
* regions are not onlined after being closed.
*
* @param regionName server to be unassigned
@ -973,7 +972,7 @@ public class AssignmentManager extends ZooKeeperListener {
* Updates the RegionState and sends the CLOSE RPC.
* <p>
* If a RegionPlan is already set, it will remain. If this is being used
* to disable a table, be sure to use {@link #disableTable(String)} to ensure
* to disable a table, be sure to use {@link #setDisabledTable(String)} to ensure
* regions are not onlined after being closed.
*
* @param regionName server to be unassigned
@ -1104,23 +1103,23 @@ public class AssignmentManager extends ZooKeeperListener {
* This is a synchronous call and will return once every region has been
* assigned. If anything fails, an exception is thrown and the cluster
* should be shutdown.
* @throws InterruptedException
* @throws IOException
*/
public void assignAllUserRegions() throws IOException {
Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
public void assignAllUserRegions() throws IOException, InterruptedException {
// Get all available servers
List<HServerInfo> servers = serverManager.getOnlineServersList();
// Scan META for all user regions, skipping any disabled tables
Map<HRegionInfo,HServerAddress> allRegions =
MetaReader.fullScan(catalogTracker, disabledTables);
MetaReader.fullScan(catalogTracker, this.zkTable.getDisabledTables());
if (allRegions == null || allRegions.isEmpty()) return;
// Determine what type of assignment to do on startup
boolean retainAssignment = master.getConfiguration().getBoolean(
"hbase.master.startup.retainassign", true);
boolean retainAssignment = master.getConfiguration().
getBoolean("hbase.master.startup.retainassign", true);
Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
if (retainAssignment) {
// Reuse existing assignment info
bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
@ -1129,71 +1128,107 @@ public class AssignmentManager extends ZooKeeperListener {
bulkPlan = LoadBalancer.roundRobinAssignment(
new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
}
LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
servers.size() + " server(s)");
servers.size() + " server(s), retainAssignment=" + retainAssignment);
// Make a fixed thread count pool to run bulk assignments. Thought is that
// if a 1k cluster, running 1k bulk concurrent assignment threads will kill
// master, HDFS or ZK?
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setDaemon(true);
builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-%1$d");
builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
// Abort if exception of any kind.
master.abort("Uncaught exception bulk assigning in " + t.getName(), e);
}
});
int threadCount =
this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 20);
java.util.concurrent.ExecutorService pool =
Executors.newFixedThreadPool(threadCount, builder.build());
// Disable timing out regions in transition up in zk while bulk assigning.
this.timeoutMonitor.bulkAssign(true);
try {
for (Map.Entry<HServerInfo, List<HRegionInfo>> e: bulkPlan.entrySet()) {
pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue()));
}
// Wait for no regions to be in transition
try {
// How long to wait on empty regions-in-transition. When we timeout,
// we'll put back in place the monitor of R-I-T. It should do fixup
// if server crashed during bulk assign, etc.
long timeout =
this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000);
waitUntilNoRegionsInTransition(timeout);
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for regions to be assigned", e);
throw new IOException(e);
}
} finally {
// We're done with the pool. It'll exit when its done all in queue.
pool.shutdown();
// Reenable timing out regions in transition up in zi.
this.timeoutMonitor.bulkAssign(false);
}
// Use fixed count thread pool assigning.
BulkAssigner ba = new BulkStartupAssigner(this.master, bulkPlan, this);
ba.bulkAssign();
LOG.info("Bulk assigning done");
}
/**
* Run bulk assign on startup.
*/
static class BulkStartupAssigner extends BulkAssigner {
private final Map<HServerInfo, List<HRegionInfo>> bulkPlan;
private final AssignmentManager assignmentManager;
BulkStartupAssigner(final Server server,
final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
final AssignmentManager am) {
super(server);
this.bulkPlan = bulkPlan;
this.assignmentManager = am;
}
@Override
public boolean bulkAssign() throws InterruptedException {
// Disable timing out regions in transition up in zk while bulk assigning.
this.assignmentManager.timeoutMonitor.bulkAssign(true);
try {
return super.bulkAssign();
} finally {
// Reenable timing out regions in transition up in zi.
this.assignmentManager.timeoutMonitor.bulkAssign(false);
}
}
@Override
protected String getThreadNamePrefix() {
return super.getThreadNamePrefix() + "-startup";
}
@Override
protected void populatePool(java.util.concurrent.ExecutorService pool) {
for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
this.assignmentManager));
}
}
protected boolean waitUntilDone(final long timeout)
throws InterruptedException {
return this.assignmentManager.waitUntilNoRegionsInTransition(timeout);
}
}
/**
* Manage bulk assigning to a server.
*/
class SingleServerBulkAssigner implements Runnable {
static class SingleServerBulkAssigner implements Runnable {
private final HServerInfo regionserver;
private final List<HRegionInfo> regions;
private final AssignmentManager assignmentManager;
SingleServerBulkAssigner(final HServerInfo regionserver,
final List<HRegionInfo> regions) {
final List<HRegionInfo> regions, final AssignmentManager am) {
this.regionserver = regionserver;
this.regions = regions;
this.assignmentManager = am;
}
@Override
public void run() {
assign(this.regionserver, this.regions);
this.assignmentManager.assign(this.regionserver, this.regions);
}
}
/**
* Wait until no regions in transition.
* @param timeout How long to wait.
* @return True if nothing in regions in transition.
* @throws InterruptedException
*/
boolean waitUntilNoRegionsInTransition(final long timeout)
throws InterruptedException {
// Blocks until there are no regions in transition. It is possible that
// there
// are regions in transition immediately after this returns but guarantees
// that if it returns without an exception that there was a period of time
// with no regions in transition from the point-of-view of the in-memory
// state of the Master.
long startTime = System.currentTimeMillis();
long remaining = timeout;
synchronized (regionsInTransition) {
while (regionsInTransition.size() > 0 && !this.master.isStopped()
&& remaining > 0) {
regionsInTransition.wait(remaining);
remaining = timeout - (System.currentTimeMillis() - startTime);
}
}
return regionsInTransition.isEmpty();
}
/**
* Rebuild the list of user regions and assignment information.
* <p>
@ -1290,28 +1325,6 @@ public class AssignmentManager extends ZooKeeperListener {
hris.add(hri);
}
/**
* Blocks until there are no regions in transition. It is possible that there
* are regions in transition immediately after this returns but guarantees
* that if it returns without an exception that there was a period of time
* with no regions in transition from the point-of-view of the in-memory
* state of the Master.
* @param timeout How long to wait on empty regions-in-transition.
* @throws InterruptedException
*/
public void waitUntilNoRegionsInTransition(final long timeout)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long remaining = timeout;
synchronized (this.regionsInTransition) {
while(this.regionsInTransition.size() > 0 &&
!this.master.isStopped() && remaining > 0) {
this.regionsInTransition.wait(remaining);
remaining = timeout - (System.currentTimeMillis() - startTime);
}
}
}
/**
* @return A copy of the Map of regions currently in transition.
*/
@ -1370,18 +1383,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
* Checks if the specified table has been disabled by the user.
* @param tableName
* @return
*/
public boolean isTableDisabled(String tableName) {
synchronized(disabledTables) {
return disabledTables.contains(tableName);
}
}
/**
* Wait on regions to clean regions-in-transition.
* Wait on region to clear regions-in-transition.
* @param hri Region to wait on.
* @throws IOException
*/
@ -1401,82 +1403,22 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
/**
* Checks if the table of the specified region has been disabled by the user.
* @param regionName
* @return
*/
public boolean isTableOfRegionDisabled(byte [] regionName) {
return isTableDisabled(Bytes.toString(
HRegionInfo.getTableName(regionName)));
}
/**
* Sets the specified table to be disabled.
* @param tableName table to be disabled
*/
public void disableTable(String tableName) {
synchronized(disabledTables) {
if(!isTableDisabled(tableName)) {
disabledTables.add(tableName);
try {
ZKTableDisable.disableTable(master.getZooKeeper(), tableName);
} catch (KeeperException e) {
LOG.warn("ZK error setting table as disabled", e);
}
}
}
}
/**
* Unsets the specified table from being disabled.
* <p>
* This operation only acts on the in-memory
* @param tableName table to be undisabled
*/
public void undisableTable(String tableName) {
synchronized(disabledTables) {
if(isTableDisabled(tableName)) {
disabledTables.remove(tableName);
try {
ZKTableDisable.undisableTable(master.getZooKeeper(), tableName);
} catch (KeeperException e) {
LOG.warn("ZK error setting table as disabled", e);
}
}
}
}
/**
* Rebuild the set of disabled tables from zookeeper. Used during master
* failover.
*/
private void rebuildDisabledTables() {
synchronized(disabledTables) {
List<String> disabledTables;
try {
disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper());
} catch (KeeperException e) {
LOG.warn("ZK error getting list of disabled tables", e);
return;
}
if(!disabledTables.isEmpty()) {
LOG.info("Rebuilt list of " + disabledTables.size() + " disabled " +
"tables from zookeeper");
this.disabledTables.addAll(disabledTables);
}
}
}
/**
* Gets the online regions of the specified table.
* This method looks at the in-memory state. It does not go to <code>.META.</code>.
* Only returns <em>online</em> regions. If a region on this table has been
* closed during a disable, etc., it will be included in the returned list.
* So, the returned list may not necessarily be ALL regions in this table, its
* all the ONLINE regions in the table.
* @param tableName
* @return
* @return Online regions from <code>tableName</code>
*/
public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo(
new HTableDescriptor(tableName), null, null)).keySet()) {
HRegionInfo boundary =
new HRegionInfo(new HTableDescriptor(tableName), null, null);
for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
tableRegions.add(regionInfo);
} else {

View File

@ -795,13 +795,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
}
public void enableTable(final byte [] tableName) throws IOException {
new EnableTableHandler(this, tableName, catalogTracker, assignmentManager)
.process();
this.executorService.submit(new EnableTableHandler(this, tableName,
catalogTracker, assignmentManager));
}
public void disableTable(final byte [] tableName) throws IOException {
new DisableTableHandler(this, tableName, catalogTracker, assignmentManager)
.process();
this.executorService.submit(new DisableTableHandler(this, tableName,
catalogTracker, assignmentManager));
}
/**
@ -857,7 +857,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
throw new TableNotFoundException(tableNameStr);
}
if (!getAssignmentManager().isTableDisabled(Bytes.toString(tableName))) {
if (!getAssignmentManager().getZKTable().
isDisabledTable(Bytes.toString(tableName))) {
throw new TableNotDisabledException(tableName);
}
}

View File

@ -404,7 +404,8 @@ public class LoadBalancer {
assignments.put(server, new ArrayList<HRegionInfo>());
}
for (Map.Entry<HRegionInfo, HServerAddress> region : regions.entrySet()) {
HServerInfo server = serverMap.get(region.getValue());
HServerAddress hsa = region.getValue();
HServerInfo server = hsa == null? null: serverMap.get(hsa);
if (server != null) {
assignments.get(server).add(region.getKey());
} else {

View File

@ -36,10 +36,8 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
*/
public class ClosedRegionHandler extends EventHandler implements TotesHRegionInfo {
private static final Log LOG = LogFactory.getLog(ClosedRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo regionInfo;
private final ClosedPriority priority;
private enum ClosedPriority {
@ -84,7 +82,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
public void process() {
LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
// Check if this table is being disabled or not
if (assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
if (this.assignmentManager.getZKTable().
isDisabledTable(this.regionInfo.getTableDesc().getNameAsString())) {
assignmentManager.offlineDisabledRegion(regionInfo);
return;
}

View File

@ -31,20 +31,20 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.zookeeper.KeeperException;
public class DeleteTableHandler extends TableEventHandler {
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
public DeleteTableHandler(byte [] tableName, Server server,
final MasterServices masterServices) throws IOException {
final MasterServices masterServices)
throws IOException {
super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices);
}
@Override
protected void handleTableOperation(List<HRegionInfo> regions)
throws IOException {
throws IOException, KeeperException {
AssignmentManager am = this.masterServices.getAssignmentManager();
long waitTime = server.getConfiguration().
getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
@ -73,6 +73,6 @@ public class DeleteTableHandler extends TableEventHandler {
// If entry for this table in zk, and up in AssignmentManager, remove it.
// Call to undisableTable does this. TODO: Make a more formal purge table.
am.undisableTable(Bytes.toString(tableName));
am.getZKTable().setEnabledTable(Bytes.toString(tableName));
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,12 +32,15 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Handler to run disable of a table.
*/
public class DisableTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(DisableTableHandler.class);
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
@ -52,7 +56,7 @@ public class DisableTableHandler extends EventHandler {
// TODO: do we want to keep this in-memory as well? i guess this is
// part of old master rewrite, schema to zk to check for table
// existence and such
if(!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(Bytes.toString(tableName));
}
}
@ -60,32 +64,90 @@ public class DisableTableHandler extends EventHandler {
@Override
public void process() {
try {
LOG.info("Attemping to disable the table " + this.tableNameStr);
LOG.info("Attemping to disable table " + this.tableNameStr);
handleDisableTable();
} catch (IOException e) {
LOG.error("Error trying to disable the table " + this.tableNameStr, e);
LOG.error("Error trying to disable table " + this.tableNameStr, e);
} catch (KeeperException e) {
LOG.error("Error trying to disable table " + this.tableNameStr, e);
}
}
private void handleDisableTable() throws IOException {
if (this.assignmentManager.isTableDisabled(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " is already disabled; skipping disable");
private void handleDisableTable() throws IOException, KeeperException {
if (this.assignmentManager.getZKTable().isDisabledTable(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " already disabled; skipping disable");
return;
}
// Set the table as disabled so it doesn't get re-onlined
assignmentManager.disableTable(this.tableNameStr);
// Get the online regions of this table.
// TODO: What if region splitting at the time we get this listing?
// TODO: Remove offline flag from HRI
// TODO: Confirm we have parallel closing going on.
List<HRegionInfo> regions = assignmentManager.getRegionsOfTable(tableName);
// Unassign the online regions
for(HRegionInfo region: regions) {
assignmentManager.unassign(region);
// Set table disabling flag up in zk.
this.assignmentManager.getZKTable().setDisablingTable(this.tableNameStr);
boolean done = false;
while (true) {
// Get list of online regions that are of this table. Regions that are
// already closed will not be included in this list; i.e. the returned
// list is not ALL regions in a table, its all online regions according to
// the in-memory state on this master.
final List<HRegionInfo> regions =
this.assignmentManager.getRegionsOfTable(tableName);
if (regions.size() == 0) {
done = true;
break;
}
LOG.info("Offlining " + regions.size() + " regions.");
BulkDisabler bd = new BulkDisabler(this.server, regions);
try {
if (bd.bulkAssign()) {
done = true;
break;
}
} catch (InterruptedException e) {
LOG.warn("Disable was interrupted");
// Preserve the interrupt.
Thread.currentThread().interrupt();
break;
}
}
// Wait on table's regions to clear region in transition.
for (HRegionInfo region: regions) {
this.assignmentManager.waitOnRegionToClearRegionsInTransition(region);
// Flip the table to disabled if success.
if (done) this.assignmentManager.getZKTable().setDisabledTable(this.tableNameStr);
LOG.info("Disabled table is done=" + done);
}
/**
* Run bulk disable.
*/
class BulkDisabler extends BulkAssigner {
private final List<HRegionInfo> regions;
BulkDisabler(final Server server, final List<HRegionInfo> regions) {
super(server);
this.regions = regions;
}
@Override
protected void populatePool(ExecutorService pool) {
for (HRegionInfo region: regions) {
if (assignmentManager.isRegionInTransition(region) != null) continue;
final HRegionInfo hri = region;
pool.execute(new Runnable() {
public void run() {
assignmentManager.unassign(hri);
}
});
}
}
@Override
protected boolean waitUntilDone(long timeout)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long remaining = timeout;
List<HRegionInfo> regions = null;
while (!server.isStopped() && remaining > 0) {
Thread.sleep(1000);
regions = assignmentManager.getRegionsOfTable(tableName);
if (regions.isEmpty()) break;
remaining = timeout - (System.currentTimeMillis() - startTime);
}
return regions != null && regions.isEmpty();
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,12 +32,15 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Handler to run enable of a table.
*/
public class EnableTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(EnableTableHandler.class);
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
@ -51,7 +55,7 @@ public class EnableTableHandler extends EventHandler {
this.ct = catalogTracker;
this.assignmentManager = assignmentManager;
// Check if table exists
if(!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(Bytes.toString(tableName));
}
}
@ -63,25 +67,113 @@ public class EnableTableHandler extends EventHandler {
handleEnableTable();
} catch (IOException e) {
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
} catch (KeeperException e) {
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
}
}
private void handleEnableTable() throws IOException {
if (!this.assignmentManager.isTableDisabled(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " is not disabled; skipping enable");
private void handleEnableTable() throws IOException, KeeperException {
if (this.assignmentManager.getZKTable().isEnabledTable(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " is already enabled; skipping enable");
return;
}
// Get the regions of this table
List<HRegionInfo> regions = MetaReader.getTableRegions(this.ct, tableName);
// Set the table as disabled so it doesn't get re-onlined
assignmentManager.undisableTable(this.tableNameStr);
// Verify all regions of table are disabled
for (HRegionInfo region : regions) {
assignmentManager.assign(region, true);
// I could check table is disabling and if so, not enable but require
// that user first finish disabling but that might be obnoxious.
// Set table enabling flag up in zk.
this.assignmentManager.getZKTable().setEnablingTable(this.tableNameStr);
boolean done = false;
while (true) {
// Get the regions of this table. We're done when all listed
// tables are onlined.
List<HRegionInfo> regionsInMeta =
MetaReader.getTableRegions(this.ct, tableName);
int countOfRegionsInTable = regionsInMeta.size();
List<HRegionInfo> regions = regionsToAssign(regionsInMeta);
if (regions.size() == 0) {
done = true;
break;
}
LOG.info("Table has " + countOfRegionsInTable + " regions of which " +
regions.size() + " are online.");
BulkEnabler bd = new BulkEnabler(this.server, regions,
countOfRegionsInTable);
try {
if (bd.bulkAssign()) {
done = true;
break;
}
} catch (InterruptedException e) {
LOG.warn("Enable was interrupted");
// Preserve the interrupt.
Thread.currentThread().interrupt();
break;
}
}
// Wait on table's regions to clear region in transition.
for (HRegionInfo region: regions) {
this.assignmentManager.waitOnRegionToClearRegionsInTransition(region);
// Flip the table to disabled.
if (done) this.assignmentManager.getZKTable().setEnabledTable(this.tableNameStr);
LOG.info("Enabled table is done=" + done);
}
/**
* @param regionsInMeta This datastructure is edited by this method.
* @return The <code>regionsInMeta</code> list minus the regions that have
* been onlined; i.e. List of regions that need onlining.
* @throws IOException
*/
private List<HRegionInfo> regionsToAssign(final List<HRegionInfo> regionsInMeta)
throws IOException {
final List<HRegionInfo> onlineRegions =
this.assignmentManager.getRegionsOfTable(tableName);
regionsInMeta.removeAll(onlineRegions);
return regionsInMeta;
}
/**
* Run bulk enable.
*/
class BulkEnabler extends BulkAssigner {
private final List<HRegionInfo> regions;
// Count of regions in table at time this assign was launched.
private final int countOfRegionsInTable;
BulkEnabler(final Server server, final List<HRegionInfo> regions,
final int countOfRegionsInTable) {
super(server);
this.regions = regions;
this.countOfRegionsInTable = countOfRegionsInTable;
}
@Override
protected void populatePool(ExecutorService pool) {
for (HRegionInfo region: regions) {
if (assignmentManager.isRegionInTransition(region) != null) continue;
final HRegionInfo hri = region;
pool.execute(new Runnable() {
public void run() {
assignmentManager.assign(hri, true);
}
});
}
}
@Override
protected boolean waitUntilDone(long timeout)
throws InterruptedException {
long startTime = System.currentTimeMillis();
long remaining = timeout;
List<HRegionInfo> regions = null;
while (!server.isStopped() && remaining > 0) {
Thread.sleep(1000);
regions = assignmentManager.getRegionsOfTable(tableName);
if (isDone(regions)) break;
remaining = timeout - (System.currentTimeMillis() - startTime);
}
return isDone(regions);
}
private boolean isDone(final List<HRegionInfo> regions) {
return regions != null && regions.size() >= this.countOfRegionsInTable;
}
}
}

View File

@ -92,7 +92,7 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
regionInfo.getEncodedName() + ")", e);
}
this.assignmentManager.regionOnline(regionInfo, serverInfo);
if (assignmentManager.isTableDisabled(
if (this.assignmentManager.getZKTable().isDisabledTable(
regionInfo.getTableDesc().getNameAsString())) {
LOG.debug("Opened region " + regionInfo.getRegionNameAsString() + " but "
+ "this table is disabled, triggering close of region");

View File

@ -167,7 +167,7 @@ public class ServerShutdownHandler extends EventHandler {
AssignmentManager assignmentManager, CatalogTracker catalogTracker)
throws IOException {
// If table is not disabled but the region is offlined,
boolean disabled = assignmentManager.isTableDisabled(
boolean disabled = assignmentManager.getZKTable().isDisabledTable(
hri.getTableDesc().getNameAsString());
if (disabled) return false;
if (hri.isOffline() && hri.isSplit()) {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Base class for performing operations against tables.
@ -42,6 +43,7 @@ public abstract class TableEventHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(TableEventHandler.class);
protected final MasterServices masterServices;
protected final byte [] tableName;
protected final String tableNameStr;
public TableEventHandler(EventType eventType, byte [] tableName, Server server,
MasterServices masterServices)
@ -50,6 +52,7 @@ public abstract class TableEventHandler extends EventHandler {
this.masterServices = masterServices;
this.tableName = tableName;
this.masterServices.checkTableModifiable(tableName);
this.tableNameStr = Bytes.toString(this.tableName);
}
@Override
@ -62,11 +65,12 @@ public abstract class TableEventHandler extends EventHandler {
tableName);
handleTableOperation(hris);
} catch (IOException e) {
LOG.error("Error trying to delete the table " + Bytes.toString(tableName),
e);
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} catch (KeeperException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
}
}
protected abstract void handleTableOperation(List<HRegionInfo> regions)
throws IOException;
throws IOException, KeeperException;
}

View File

@ -55,12 +55,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -2536,47 +2535,6 @@ public class HRegion implements HeapSize { // , Writable{
}
}
/**
* Utility method used by HMaster marking regions offlined.
* @param srvr META server to be updated
* @param metaRegionName Meta region name
* @param info HRegion to update in <code>meta</code>
*
* @throws IOException
*/
public static void offlineRegionInMETA(final HRegionInterface srvr,
final byte [] metaRegionName, final HRegionInfo info)
throws IOException {
// Puts and Deletes used to be "atomic" here. We can use row locks if
// we need to keep that property, or we can expand Puts and Deletes to
// allow them to be committed at once.
byte [] row = info.getRegionName();
Put put = new Put(row);
info.setOffline(true);
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
Writables.getBytes(info));
srvr.put(metaRegionName, put);
cleanRegionInMETA(srvr, metaRegionName, info);
}
/**
* Clean COL_SERVER and COL_STARTCODE for passed <code>info</code> in
* <code>.META.</code>
* @param srvr
* @param metaRegionName
* @param info
* @throws IOException
*/
public static void cleanRegionInMETA(final HRegionInterface srvr,
final byte [] metaRegionName, final HRegionInfo info)
throws IOException {
Delete del = new Delete(info.getRegionName());
del.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
del.deleteColumns(HConstants.CATALOG_FAMILY,
HConstants.STARTCODE_QUALIFIER);
srvr.delete(metaRegionName, del);
}
/**
* Deletes all the files for a HRegion
*

View File

@ -0,0 +1,298 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Helper class for table state tracking for use by {@link AssignmentManager}.
* Reads, caches and sets state up in zookeeper. If multiple read/write
* clients, will make for confusion. Read-only clients other than
* AssignmentManager interested in learning table state can use the
* read-only utility methods {@link #isEnabledTable(ZooKeeperWatcher, String)}
* and {@link #isDisabledTable(ZooKeeperWatcher, String)}.
*
* <p>To save on trips to the zookeeper ensemble, internally we cache table
* state.
*/
public class ZKTable {
// A znode will exist under the table directory if it is in any of the
// following states: {@link TableState#ENABLING} , {@link TableState#DISABLING},
// or {@link TableState#DISABLED}. If {@link TableState#ENABLED}, there will
// be no entry for a table in zk. Thats how it currently works.
private static final Log LOG = LogFactory.getLog(ZKTable.class);
private final ZooKeeperWatcher watcher;
/**
* Cache of what we found in zookeeper so we don't have to go to zk ensemble
* for every query. Synchronize access rather than use concurrent Map because
* synchronization needs to span query of zk.
*/
private final Map<String, TableState> cache =
new HashMap<String, TableState>();
// TODO: Make it so always a table znode. Put table schema here as well as table state.
// Have watcher on table znode so all are notified of state or schema change.
/**
* States a Table can be in.
* {@link TableState#ENABLED} is not used currently; its the absence of state
* in zookeeper that indicates an enabled table currently.
*/
public static enum TableState {
ENABLED,
DISABLED,
DISABLING,
ENABLING
};
public ZKTable(final ZooKeeperWatcher zkw) throws KeeperException {
super();
this.watcher = zkw;
populateTableStates();
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @param zkw
* @return list of disabled tables, empty list if none
* @throws KeeperException
*/
private void populateTableStates()
throws KeeperException {
synchronized (this.cache) {
List<String> children =
ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode);
for (String child: children) {
TableState state = getTableState(this.watcher, child);
if (state != null) this.cache.put(child, state);
}
}
}
/**
* @param zkw
* @param child
* @return Null or {@link TableState} found in znode.
* @throws KeeperException
*/
private static TableState getTableState(final ZooKeeperWatcher zkw,
final String child)
throws KeeperException {
String znode = ZKUtil.joinZNode(zkw.tableZNode, child);
byte [] data = ZKUtil.getData(zkw, znode);
if (data == null || data.length <= 0) {
// Null if table is enabled.
return null;
}
String str = Bytes.toString(data);
try {
return TableState.valueOf(str);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(str);
}
}
/**
* Sets the specified table as DISABLED in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setDisabledTable(String tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isDisablingOrDisabledTable(tableName)) {
LOG.warn("Moving table " + tableName + " state to disabled but was " +
"not first in disabling state: " + this.cache.get(tableName));
}
setTableState(tableName, TableState.DISABLED);
}
}
/**
* Sets the specified table as DISABLING in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setDisablingTable(final String tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isEnabledOrDisablingTable(tableName)) {
LOG.warn("Moving table " + tableName + " state to disabling but was " +
"not first in enabled state: " + this.cache.get(tableName));
}
setTableState(tableName, TableState.DISABLING);
}
}
/**
* Sets the specified table as ENABLING in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setEnablingTable(final String tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isDisabledOrEnablingTable(tableName)) {
LOG.warn("Moving table " + tableName + " state to disabling but was " +
"not first in enabled state: " + this.cache.get(tableName));
}
setTableState(tableName, TableState.ENABLING);
}
}
private void setTableState(final String tableName, final TableState state)
throws KeeperException {
String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName);
if (ZKUtil.checkExists(this.watcher, znode) == -1) {
ZKUtil.createAndFailSilent(this.watcher, znode);
}
synchronized (this.cache) {
ZKUtil.setData(this.watcher, znode, Bytes.toBytes(state.toString()));
this.cache.put(tableName, state);
}
}
public boolean isDisabledTable(final String tableName) {
return isTableState(tableName, TableState.DISABLED);
}
/**
* Go to zookeeper and see if state of table is {@link TableState#DISABLED}.
* This method does not use cache as {@link #isDisabledTable(String)} does.
* This method is for clients other than {@link AssignmentManager}
* @param zkw
* @param tableName
* @return True if table is enabled.
* @throws KeeperException
*/
public static boolean isDisabledTable(final ZooKeeperWatcher zkw,
final String tableName)
throws KeeperException {
TableState state = getTableState(zkw, tableName);
return isTableState(TableState.DISABLED, state);
}
public boolean isDisablingTable(final String tableName) {
return isTableState(tableName, TableState.DISABLING);
}
public boolean isEnablingTable(final String tableName) {
return isTableState(tableName, TableState.ENABLING);
}
public boolean isEnabledTable(String tableName) {
synchronized (this.cache) {
// No entry in cache means enabled table.
return !this.cache.containsKey(tableName);
}
}
/**
* Go to zookeeper and see if state of table is {@link TableState#ENABLED}.
* This method does not use cache as {@link #isEnabledTable(String)} does.
* This method is for clients other than {@link AssignmentManager}
* @param zkw
* @param tableName
* @return True if table is enabled.
* @throws KeeperException
*/
public static boolean isEnabledTable(final ZooKeeperWatcher zkw,
final String tableName)
throws KeeperException {
return getTableState(zkw, tableName) == null;
}
public boolean isDisablingOrDisabledTable(final String tableName) {
synchronized (this.cache) {
return isDisablingTable(tableName) || isDisabledTable(tableName);
}
}
public boolean isEnabledOrDisablingTable(final String tableName) {
synchronized (this.cache) {
return isEnabledTable(tableName) || isDisablingTable(tableName);
}
}
public boolean isDisabledOrEnablingTable(final String tableName) {
synchronized (this.cache) {
return isDisabledTable(tableName) || isEnablingTable(tableName);
}
}
private boolean isTableState(final String tableName, final TableState state) {
synchronized (this.cache) {
TableState currentState = this.cache.get(tableName);
return isTableState(currentState, state);
}
}
private static boolean isTableState(final TableState expectedState,
final TableState currentState) {
return currentState != null && currentState.equals(expectedState);
}
/**
* Enables the table in zookeeper. Fails silently if the
* table is not currently disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setEnabledTable(final String tableName)
throws KeeperException {
synchronized (this.cache) {
if (this.cache.remove(tableName) == null) {
LOG.warn("Moving table " + tableName + " state to enabled but was " +
"already enabled");
}
ZKUtil.deleteNodeFailSilent(this.watcher,
ZKUtil.joinZNode(this.watcher.tableZNode, tableName));
}
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
*/
public Set<String> getDisabledTables() {
Set<String> disabledTables = new HashSet<String>();
synchronized (this.cache) {
Set<String> tables = this.cache.keySet();
for (String table: tables) {
if (isDisabledTable(table)) disabledTables.add(table);
}
}
return disabledTables;
}
}

View File

@ -1,70 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.List;
import org.apache.zookeeper.KeeperException;
/**
* Helper class for table disable tracking in zookeeper.
* <p>
* The node /disabled will contain a child node for every table which should be
* disabled, for example, /disabled/table.
*/
public class ZKTableDisable {
/**
* Sets the specified table as disabled in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param zkw
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public static void disableTable(ZooKeeperWatcher zkw, String tableName)
throws KeeperException {
ZKUtil.createAndFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode,
tableName));
}
/**
* Unsets the specified table as disabled in zookeeper. Fails silently if the
* table is not currently disabled in zookeeper. Sets no watches.
* @param zkw
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public static void undisableTable(ZooKeeperWatcher zkw, String tableName)
throws KeeperException {
ZKUtil.deleteNodeFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode,
tableName));
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @param zkw
* @return list of disabled tables, empty list if none
* @throws KeeperException
*/
public static List<String> getDisabledTables(ZooKeeperWatcher zkw)
throws KeeperException {
return ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
}
}

View File

@ -712,6 +712,23 @@ public class ZKUtil {
}
}
/**
* Set data into node creating node if it doesn't yet exist.
* Does not set watch.
* @param zkw zk reference
* @param znode path of node
* @param data data to set for node
* @throws KeeperException
*/
public static void createSetData(final ZooKeeperWatcher zkw, final String znode,
final byte [] data)
throws KeeperException {
if (checkExists(zkw, znode) != -1) {
ZKUtil.createWithParents(zkw, znode);
}
ZKUtil.setData(zkw, znode, data);
}
/**
* Sets the data of the existing znode to be the specified data. The node
* must exist but no checks are done on the existing data or version.
@ -902,8 +919,7 @@ public class ZKUtil {
* @param znode path of node
* @throws KeeperException if unexpected zookeeper exception
*/
public static void createWithParents(ZooKeeperWatcher zkw,
String znode)
public static void createWithParents(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
if(znode == null) {

View File

@ -86,6 +86,12 @@ module Hbase
@admin.disableTable(table_name)
end
#----------------------------------------------------------------------------------------------
# Is table disabled?
def disabled?(table_name)
@admin.isTableDisabled(table_name)
end
#----------------------------------------------------------------------------------------------
# Drops a table
def drop(table_name)

View File

@ -217,8 +217,10 @@ Shell.load_command_group(
create
describe
disable
is_disabled
drop
enable
is_enabled
exists
list
]

View File

@ -23,7 +23,7 @@ module Shell
class Disable < Command
def help
return <<-EOF
Disable the named table: e.g. "hbase> disable 't1'"
Start disable of named table: e.g. "hbase> disable 't1'"
EOF
end

View File

@ -23,7 +23,7 @@ module Shell
class Enable < Command
def help
return <<-EOF
Enable the named table: e.g. "hbase> enable 't1'"
Start enable of named table: e.g. "hbase> enable 't1'"
EOF
end

View File

@ -0,0 +1,78 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class IsDisabled < Command
def help
return <<-EOF
Is named table disabled?: e.g. "hbase> is_disabled 't1'"
EOF
end
def command(table)
format_simple_command do
formatter.row([
admin.disabled?(table)? "true" : "false"
])
end
end
end
end
end
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class IsDisabled < Command
def help
return <<-EOF
Is table disabled: e.g. "hbase> is_disabled 't1'"
EOF
end
def command(table)
format_simple_command do
formatter.row([
admin.disabled?(table) ? "true" : "false"
])
end
end
end
end
end

View File

@ -0,0 +1,78 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class IsEnabled < Command
def help
return <<-EOF
Is named table enabled?: e.g. "hbase> is_enabled 't1'"
EOF
end
def command(table)
format_simple_command do
formatter.row([
admin.enabled?(table)? "true" : "false"
])
end
end
end
end
end
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class IsEnabled < Command
def help
return <<-EOF
Is table enabled: e.g. "hbase> enable 't1'"
EOF
end
def command(table)
format_simple_command do
formatter.row([
admin.enabled?(table) ? "true" : "false"
])
end
end
end
end
end

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -46,10 +45,8 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -84,6 +81,45 @@ public class TestAdmin {
this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
}
@Test
public void testDisableAndEnableTable() throws IOException {
final byte [] row = Bytes.toBytes("row");
final byte [] qualifier = Bytes.toBytes("qualifier");
final byte [] value = Bytes.toBytes("value");
final byte [] table = Bytes.toBytes("testDisableAndEnableTable");
HTable ht = TEST_UTIL.createTable(table, HConstants.CATALOG_FAMILY);
Put put = new Put(row);
put.add(HConstants.CATALOG_FAMILY, qualifier, value);
ht.put(put);
Get get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
ht.get(get);
this.admin.disableTable(table);
// Test that table is disabled
get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
boolean ok = false;
try {
ht.get(get);
} catch (NotServingRegionException e) {
ok = true;
} catch (RetriesExhaustedException e) {
ok = true;
}
assertTrue(ok);
this.admin.enableTable(table);
// Test that table is enabled
try {
ht.get(get);
} catch (RetriesExhaustedException e) {
ok = false;
}
assertTrue(ok);
}
@Test
public void testHBaseFsck() throws IOException {
HBaseFsck fsck =
@ -429,45 +465,6 @@ public class TestAdmin {
}
}
@Test
public void testDisableAndEnableTable() throws IOException {
final byte [] row = Bytes.toBytes("row");
final byte [] qualifier = Bytes.toBytes("qualifier");
final byte [] value = Bytes.toBytes("value");
final byte [] table = Bytes.toBytes("testDisableAndEnableTable");
HTable ht = TEST_UTIL.createTable(table, HConstants.CATALOG_FAMILY);
Put put = new Put(row);
put.add(HConstants.CATALOG_FAMILY, qualifier, value);
ht.put(put);
Get get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
ht.get(get);
this.admin.disableTable(table);
// Test that table is disabled
get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
boolean ok = false;
try {
ht.get(get);
} catch (NotServingRegionException e) {
ok = true;
} catch (RetriesExhaustedException e) {
ok = true;
}
assertTrue(ok);
this.admin.enableTable(table);
//Test that table is enabled
try {
ht.get(get);
} catch (RetriesExhaustedException e) {
ok = false;
}
assertTrue(ok);
}
@Test
public void testTableExist() throws IOException {
final byte [] table = Bytes.toBytes("testTableExist");
@ -769,4 +766,4 @@ public class TestAdmin {
this.admin.deleteTable(tableName);
}
}
}
}

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
@ -324,7 +324,8 @@ public class TestMasterFailover {
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable));
ZKTable zktable = new ZKTable(zkw);
zktable.setDisabledTable(Bytes.toString(disabledTable));
/*
* ZK = OFFLINE
@ -652,7 +653,8 @@ public class TestMasterFailover {
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable));
ZKTable zktable = new ZKTable(zkw);
zktable.setDisabledTable(Bytes.toString(disabledTable));
/*
* ZK = CLOSING

View File

@ -0,0 +1,89 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestZKTable {
private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@Test
public void testTableStates()
throws ZooKeeperConnectionException, IOException, KeeperException {
final String name = "testDisabled";
Abortable abortable = new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.info(why, e);
}
};
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
name, abortable);
ZKTable zkt = new ZKTable(zkw);
assertTrue(zkt.isEnabledTable(name));
assertFalse(zkt.isDisablingTable(name));
assertFalse(zkt.isDisabledTable(name));
assertFalse(zkt.isEnablingTable(name));
assertFalse(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.isDisabledOrEnablingTable(name));
zkt.setDisablingTable(name);
assertTrue(zkt.isDisablingTable(name));
assertTrue(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.getDisabledTables().contains(name));
zkt.setDisabledTable(name);
assertTrue(zkt.isDisabledTable(name));
assertTrue(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.isDisablingTable(name));
assertTrue(zkt.getDisabledTables().contains(name));
zkt.setEnablingTable(name);
assertTrue(zkt.isEnablingTable(name));
assertTrue(zkt.isDisabledOrEnablingTable(name));
assertFalse(zkt.isDisabledTable(name));
assertFalse(zkt.getDisabledTables().contains(name));
zkt.setEnabledTable(name);
assertTrue(zkt.isEnabledTable(name));
assertFalse(zkt.isEnablingTable(name));
}
}