HBASE-15105 Procedure V2 - Procedure Queue with Namespaces

This commit is contained in:
Matteo Bertozzi 2016-01-14 13:45:17 -08:00
parent 18a48af242
commit ae7cc0c848
6 changed files with 282 additions and 115 deletions

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.NavigableSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -76,35 +74,14 @@ public class TableNamespaceManager {
private ZKNamespaceManager zkNamespaceManager;
private boolean initialized;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public static final String KEY_MAX_REGIONS = "hbase.namespace.quota.maxregions";
public static final String KEY_MAX_TABLES = "hbase.namespace.quota.maxtables";
static final String NS_INIT_TIMEOUT = "hbase.master.namespace.init.timeout";
static final int DEFAULT_NS_INIT_TIMEOUT = 300000;
/** Configuration key for time out for trying to acquire table locks */
private static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
"hbase.table.write.lock.timeout.ms";
/** Configuration key for time out for trying to acquire table locks */
private static final String TABLE_READ_LOCK_TIMEOUT_MS =
"hbase.table.read.lock.timeout.ms";
private static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default
private static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default
private long exclusiveLockTimeoutMs;
private long sharedLockTimeoutMs;
TableNamespaceManager(MasterServices masterServices) {
this.masterServices = masterServices;
this.conf = masterServices.getConfiguration();
this.exclusiveLockTimeoutMs = conf.getLong(
TABLE_WRITE_LOCK_TIMEOUT_MS,
DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
this.sharedLockTimeoutMs = conf.getLong(
TABLE_READ_LOCK_TIMEOUT_MS,
DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
}
public void start() throws IOException {
@ -138,30 +115,6 @@ public class TableNamespaceManager {
return nsTable;
}
private boolean acquireSharedLock() throws IOException {
try {
return rwLock.readLock().tryLock(sharedLockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}
public void releaseSharedLock() {
rwLock.readLock().unlock();
}
public boolean acquireExclusiveLock() {
try {
return rwLock.writeLock().tryLock(exclusiveLockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return false;
}
}
public void releaseExclusiveLock() {
rwLock.writeLock().unlock();
}
/*
* check whether a namespace has already existed.
*/
@ -229,13 +182,7 @@ public class TableNamespaceManager {
Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
ResultScanner scanner =
getNamespaceTable().getScanner(HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES);
boolean locked = false;
try {
locked = acquireSharedLock();
if (!locked) {
throw new IOException(
"Fail to acquire lock to scan namespace list. Some namespace DDL is in progress.");
}
for(Result r : scanner) {
byte[] val = CellUtil.cloneValue(r.getColumnLatestCell(
HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES,
@ -245,9 +192,6 @@ public class TableNamespaceManager {
}
} finally {
scanner.close();
if (locked) {
releaseSharedLock();
}
}
return ret;
}

View File

@ -200,23 +200,19 @@ public class CreateNamespaceProcedure
// Namespace manager might not be ready if master is not fully initialized,
// return false to reject user namespace creation; return true for default
// and system namespace creation (this is part of master initialization).
if (nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE)) {
return true;
}
boolean isBootstrapNs = nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE);
if (env.waitInitialized(this)) {
if (!isBootstrapNs && env.waitInitialized(this)) {
return false;
}
}
return getTableNamespaceManager(env).acquireExclusiveLock();
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
if (env.getMasterServices().isInitialized()) {
getTableNamespaceManager(env).releaseExclusiveLock();
}
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
}
@Override
@ -229,6 +225,10 @@ public class CreateNamespaceProcedure
return TableOperationType.EDIT;
}
private String getNamespaceName() {
return nsDescriptor.getName();
}
/**
* Action before any real action of creating namespace.
* @param env MasterProcedureEnv

View File

@ -212,12 +212,13 @@ public class DeleteNamespaceProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
return getTableNamespaceManager(env).acquireExclusiveLock();
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
getTableNamespaceManager(env).releaseExclusiveLock();
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
}
@Override
@ -230,6 +231,10 @@ public class DeleteNamespaceProcedure
return TableOperationType.EDIT;
}
private String getNamespaceName() {
return namespaceName;
}
/**
* Action before any real action of deleting namespace.
* @param env MasterProcedureEnv

View File

@ -309,9 +309,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!suspendQueue) suspendQueue = true;
if (isTableProcedure(procedure)) {
suspendTableQueue(event, getTableName(procedure));
waitTableEvent(event, procedure, suspendQueue);
} else if (isServerProcedure(procedure)) {
suspendServerQueue(event, getServerName(procedure));
waitServerEvent(event, procedure, suspendQueue);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@ -324,15 +324,21 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return true;
}
private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
private void waitTableEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
final TableName tableName = getTableName(procedure);
final boolean isDebugEnabled = LOG.isDebugEnabled();
schedLock.lock();
try {
TableQueue queue = getTableQueue(tableName);
if (!queue.setSuspended(true)) return;
if (queue.isSuspended()) return;
if (LOG.isDebugEnabled()) {
// TODO: if !suspendQueue
if (isDebugEnabled) {
LOG.debug("Suspend table queue " + tableName);
}
queue.setSuspended(true);
removeFromRunQueue(tableRunQueue, queue);
event.suspendTableQueue(queue);
} finally {
@ -340,16 +346,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
private void waitServerEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
final ServerName serverName = getServerName(procedure);
final boolean isDebugEnabled = LOG.isDebugEnabled();
schedLock.lock();
try {
// TODO: This will change once we have the new AM
ServerQueue queue = getServerQueue(serverName);
if (!queue.setSuspended(true)) return;
if (queue.isSuspended()) return;
if (LOG.isDebugEnabled()) {
// TODO: if !suspendQueue
if (isDebugEnabled) {
LOG.debug("Suspend server queue " + serverName);
}
queue.setSuspended(true);
removeFromRunQueue(serverRunQueue, queue);
event.suspendServerQueue(queue);
} finally {
@ -358,18 +370,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
public void suspend(ProcedureEvent event) {
final boolean isDebugEnabled = LOG.isDebugEnabled();
synchronized (event) {
event.setReady(false);
if (LOG.isDebugEnabled()) {
if (isDebugEnabled) {
LOG.debug("Suspend event " + event);
}
}
}
public void wake(ProcedureEvent event) {
final boolean isDebugEnabled = LOG.isDebugEnabled();
synchronized (event) {
event.setReady(true);
if (LOG.isDebugEnabled()) {
if (isDebugEnabled) {
LOG.debug("Wake event " + event);
}
@ -467,7 +481,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
Queue<TableName> node = AvlTree.get(tableMap, tableName);
if (node != null) return (TableQueue)node;
node = new TableQueue(tableName, getTablePriority(tableName));
NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString());
node = new TableQueue(tableName, nsQueue, getTablePriority(tableName));
tableMap = AvlTree.insert(tableMap, node);
return (TableQueue)node;
}
@ -493,6 +508,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return ((TableProcedureInterface)proc).getTableName();
}
// ============================================================================
// Namespace Queue Lookup Helpers
// ============================================================================
private NamespaceQueue getNamespaceQueue(String namespace) {
Queue<String> node = AvlTree.get(namespaceMap, namespace);
if (node != null) return (NamespaceQueue)node;
node = new NamespaceQueue(namespace);
namespaceMap = AvlTree.insert(namespaceMap, node);
return (NamespaceQueue)node;
}
// ============================================================================
// Server Queue Lookup Helpers
// ============================================================================
@ -559,10 +586,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
public static class TableQueue extends QueueImpl<TableName> {
private final NamespaceQueue namespaceQueue;
private TableLock tableLock = null;
public TableQueue(TableName tableName, int priority) {
public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) {
super(tableName, priority);
this.namespaceQueue = namespaceQueue;
}
public NamespaceQueue getNamespaceQueue() {
return namespaceQueue;
}
@Override
public synchronized boolean isAvailable() {
return super.isAvailable() && !namespaceQueue.hasExclusiveLock();
}
// TODO: We can abort pending/in-progress operation if the new call is
@ -584,9 +623,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
case CREATE:
case DELETE:
case DISABLE:
case EDIT:
case ENABLE:
return true;
case EDIT:
// we allow concurrent edit on the NS table
return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
case READ:
return false;
default:
@ -595,10 +636,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
}
private synchronized boolean trySharedLock(final TableLockManager lockManager,
private synchronized boolean tryZkSharedLock(final TableLockManager lockManager,
final String purpose) {
if (hasExclusiveLock()) return false;
// Take zk-read-lock
TableName tableName = getKey();
tableLock = lockManager.readLock(tableName, purpose);
@ -609,14 +648,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
tableLock = null;
return false;
}
trySharedLock();
return true;
}
private synchronized void releaseSharedLock(final TableLockManager lockManager) {
private synchronized void releaseZkSharedLock(final TableLockManager lockManager) {
releaseTableLock(lockManager, isSingleSharedLock());
releaseSharedLock();
}
private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
@ -653,8 +689,44 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
/**
* the namespace is currently used just as a rwlock, not as a queue.
* because ns operation are not frequent enough. so we want to avoid
* having to move table queues around for suspend/resume.
*/
private static class NamespaceQueue extends Queue<String> {
public NamespaceQueue(String namespace) {
super(namespace);
}
@Override
public boolean requireExclusiveLock(Procedure proc) {
throw new UnsupportedOperationException();
}
@Override
public void add(final Procedure proc, final boolean addToFront) {
throw new UnsupportedOperationException();
}
@Override
public Procedure poll() {
throw new UnsupportedOperationException();
}
@Override
public boolean isEmpty() {
throw new UnsupportedOperationException();
}
@Override
public int size() {
throw new UnsupportedOperationException();
}
}
// ============================================================================
// Locking Helpers
// Table Locking Helpers
// ============================================================================
/**
* Try to acquire the exclusive lock on the specified table.
@ -666,8 +738,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
schedLock.lock();
TableQueue queue = getTableQueue(table);
boolean hasXLock = queue.tryExclusiveLock();
if (!hasXLock) {
if (!queue.getNamespaceQueue().trySharedLock()) {
return false;
}
if (!queue.tryExclusiveLock()) {
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
return false;
}
@ -676,10 +752,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.unlock();
// Zk lock is expensive...
hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
boolean hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
if (!hasXLock) {
schedLock.lock();
queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue);
schedLock.unlock();
}
@ -700,6 +777,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock();
queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue);
schedLock.unlock();
}
@ -712,7 +790,29 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
return getTableQueueWithLock(table).trySharedLock(lockManager, purpose);
schedLock.lock();
TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) {
return false;
}
if (!queue.trySharedLock()) {
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
return false;
}
schedLock.unlock();
// Zk lock is expensive...
boolean hasXLock = queue.tryZkSharedLock(lockManager, purpose);
if (!hasXLock) {
schedLock.lock();
queue.releaseSharedLock();
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
}
return hasXLock;
}
/**
@ -720,7 +820,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @param table the name of the table that has the shared lock
*/
public void releaseTableSharedLock(final TableName table) {
getTableQueueWithLock(table).releaseSharedLock(lockManager);
schedLock.lock();
TableQueue queue = getTableQueue(table);
schedLock.unlock();
// Zk lock is expensive...
queue.releaseZkSharedLock(lockManager);
schedLock.lock();
queue.releaseSharedLock();
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
}
/**
@ -762,13 +872,58 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return true;
}
// ============================================================================
// Namespace Locking Helpers
// ============================================================================
/**
* Try to acquire the exclusive lock on the specified namespace.
* @see #releaseNamespaceExclusiveLock(String)
* @param nsName Namespace to lock
* @return true if we were able to acquire the lock on the namespace, otherwise false.
*/
public boolean tryAcquireNamespaceExclusiveLock(final String nsName) {
schedLock.lock();
try {
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
if (!tableQueue.trySharedLock()) return false;
NamespaceQueue nsQueue = getNamespaceQueue(nsName);
boolean hasLock = nsQueue.tryExclusiveLock();
if (!hasLock) {
tableQueue.releaseSharedLock();
}
return hasLock;
} finally {
schedLock.unlock();
}
}
/**
* Release the exclusive lock
* @see #tryAcquireNamespaceExclusiveLock(String)
* @param nsName the namespace that has the exclusive lock
*/
public void releaseNamespaceExclusiveLock(final String nsName) {
schedLock.lock();
try {
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
tableQueue.releaseSharedLock();
NamespaceQueue queue = getNamespaceQueue(nsName);
queue.releaseExclusiveLock();
} finally {
schedLock.unlock();
}
}
// ============================================================================
// Server Locking Helpers
// ============================================================================
/**
* Release the exclusive lock
* @see #tryAcquireServerExclusiveLock(ServerName)
* @param serverName the server that has the exclusive lock
* Try to acquire the exclusive lock on the specified server.
* @see #releaseServerExclusiveLock(ServerName)
* @param serverName Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerExclusiveLock(final ServerName serverName) {
schedLock.lock();

View File

@ -192,12 +192,13 @@ public class ModifyNamespaceProcedure
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
return getTableNamespaceManager(env).acquireExclusiveLock();
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
getTableNamespaceManager(env).releaseExclusiveLock();
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
}
@Override
@ -210,6 +211,10 @@ public class ModifyNamespaceProcedure
return TableOperationType.EDIT;
}
private String getNamespaceName() {
return newNsDescriptor.getName();
}
/**
* Action before any real action of adding namespace.
* @param env MasterProcedureEnv

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -291,6 +292,58 @@ public class TestMasterProcedureScheduler {
assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
}
@Test
public void testVerifyNamespaceRwLocks() throws Exception {
String nsName1 = "ns1";
String nsName2 = "ns2";
TableName tableName1 = TableName.valueOf(nsName1, "testtb");
TableName tableName2 = TableName.valueOf(nsName2, "testtb");
queue.addBack(new TestNamespaceProcedure(1, nsName1,
TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestTableProcedure(2, tableName1,
TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestTableProcedure(3, tableName2,
TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestNamespaceProcedure(4, nsName2,
TableProcedureInterface.TableOperationType.EDIT));
// Fetch the 1st item and take the write lock
long procId = queue.poll().getProcId();
assertEquals(1, procId);
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName1));
// System tables have 2 as default priority
Procedure proc = queue.poll();
assertEquals(4, proc.getProcId());
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName2));
queue.releaseNamespaceExclusiveLock(nsName2);
queue.yield(proc);
// table on ns1 is locked, so we get table on ns2
procId = queue.poll().getProcId();
assertEquals(3, procId);
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName2, "lock " + procId));
// ns2 is not available (TODO we may avoid this one)
proc = queue.poll();
assertEquals(4, proc.getProcId());
assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(nsName2));
queue.yield(proc);
// release the ns1 lock
queue.releaseNamespaceExclusiveLock(nsName1);
// we are now able to execute table of ns1
procId = queue.poll().getProcId();
assertEquals(2, procId);
queue.releaseTableExclusiveLock(tableName2);
// we are now able to execute ns2
procId = queue.poll().getProcId();
assertEquals(4, procId);
}
/**
* Verify that "write" operations for a single table are serialized,
* but different tables can be executed in parallel.
@ -440,7 +493,7 @@ public class TestMasterProcedureScheduler {
}
}
public static class TestTableProcedure extends Procedure<Void>
public static class TestTableProcedure extends TestProcedure
implements TableProcedureInterface {
private final TableOperationType opType;
private final TableName tableName;
@ -450,9 +503,9 @@ public class TestMasterProcedureScheduler {
}
public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
super(procId);
this.tableName = tableName;
this.opType = opType;
setProcId(procId);
}
@Override
@ -464,26 +517,31 @@ public class TestMasterProcedureScheduler {
public TableOperationType getTableOperationType() {
return opType;
}
}
@Override
protected Procedure[] execute(Void env) {
return null;
public static class TestNamespaceProcedure extends TestProcedure
implements TableProcedureInterface {
private final TableOperationType opType;
private final String nsName;
public TestNamespaceProcedure() {
throw new UnsupportedOperationException("recovery should not be triggered here");
}
public TestNamespaceProcedure(long procId, String nsName, TableOperationType opType) {
super(procId);
this.nsName = nsName;
this.opType = opType;
}
@Override
protected void rollback(Void env) {
throw new UnsupportedOperationException();
public TableName getTableName() {
return TableName.NAMESPACE_TABLE_NAME;
}
@Override
protected boolean abort(Void env) {
throw new UnsupportedOperationException();
}
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException {}
public TableOperationType getTableOperationType() {
return opType;
}
}
}