From 50d58edca07adebba4f61794eb1b8a31a90b2d32 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 9 Nov 2010 21:52:35 +0000 Subject: [PATCH] HBASE-3112 Enable and disable of table needs a bit of loving in new master git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1033251 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/master/BulkAssigner.java | 106 ++++++++++++++++++ .../hbase/regionserver/SplitTransaction.java | 1 + .../hadoop/hbase/zookeeper/ZKTable.java | 18 +++ 3 files changed, 125 insertions(+) create mode 100644 src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java diff --git a/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java b/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java new file mode 100644 index 00000000000..b19c1f52462 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java @@ -0,0 +1,106 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Executors; + +import org.apache.hadoop.hbase.Server; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Base class used bulk assigning and unassigning regions. + * Encapsulates a fixed size thread pool of executors to run assignment/unassignment. + * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and + * {@link #waitUntilDone(long)}. + */ +public abstract class BulkAssigner { + final Server server; + + /** + * @param server An instance of Server + * @param regionsInTransition A reference to {@link AssignmentManager#regionsInTransition} + */ + public BulkAssigner(final Server server) { + this.server = server; + } + + protected String getThreadNamePrefix() { + return this.server.getServerName() + "-BulkAssigner"; + } + + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + // Abort if exception of any kind. + server.abort("Uncaught exception in " + t.getName(), e); + } + }; + } + + protected int getThreadCount() { + return this.server.getConfiguration(). + getInt("hbase.bulk.assignment.threadpool.size", 20); + } + + protected long getTimeoutOnRIT() { + return this.server.getConfiguration(). + getLong("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000); + } + + protected abstract void populatePool(final java.util.concurrent.ExecutorService pool); + + /** + * Run the bulk assign. + * @throws InterruptedException + * @return True if done. + */ + public boolean bulkAssign() throws InterruptedException { + boolean result = false; + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setDaemon(true); + builder.setNameFormat(getThreadNamePrefix() + "-%1$d"); + builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); + int threadCount = getThreadCount(); + java.util.concurrent.ExecutorService pool = + Executors.newFixedThreadPool(threadCount, builder.build()); + try { + populatePool(pool); + // How long to wait on empty regions-in-transition. If we timeout, the + // RIT monitor should do fixup. + result = waitUntilDone(getTimeoutOnRIT()); + } finally { + // We're done with the pool. It'll exit when its done all in queue. + pool.shutdown(); + } + return result; + } + + /** + * Wait until bulk assign is done. + * @param timeout How long to wait. + * @throws InterruptedException + * @return True if the condition we were waiting on happened. + */ + protected abstract boolean waitUntilDone(final long timeout) + throws InterruptedException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 1bcde8cc8bf..530a29c0cd8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.util.Progressable; import org.apache.zookeeper.KeeperException; diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java index c757de89aeb..3a2242fe0cd 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java @@ -239,6 +239,24 @@ public class ZKTable { } } + /** + * Go to zookeeper and see if state of table is {@link TableState#DISABLING} + * of {@link TableState#DISABLED}. + * This method does not use cache as {@link #isEnabledTable(String)} does. + * This method is for clients other than {@link AssignmentManager}. + * @param zkw + * @param tableName + * @return True if table is enabled. + * @throws KeeperException + */ + public static boolean isDisablingOrDisabledTable(final ZooKeeperWatcher zkw, + final String tableName) + throws KeeperException { + TableState state = getTableState(zkw, tableName); + return isTableState(TableState.DISABLING, state) || + isTableState(TableState.DISABLED, state); + } + public boolean isEnabledOrDisablingTable(final String tableName) { synchronized (this.cache) { return isEnabledTable(tableName) || isDisablingTable(tableName);