HBASE-15105 Procedure V2 - Procedure Queue with Namespaces
This commit is contained in:
parent
b753226e72
commit
f8427aba2b
|
@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.master;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -70,35 +68,14 @@ public class TableNamespaceManager {
|
|||
private ZKNamespaceManager zkNamespaceManager;
|
||||
private boolean initialized;
|
||||
|
||||
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
|
||||
public static final String KEY_MAX_REGIONS = "hbase.namespace.quota.maxregions";
|
||||
public static final String KEY_MAX_TABLES = "hbase.namespace.quota.maxtables";
|
||||
static final String NS_INIT_TIMEOUT = "hbase.master.namespace.init.timeout";
|
||||
static final int DEFAULT_NS_INIT_TIMEOUT = 300000;
|
||||
|
||||
/** Configuration key for time out for trying to acquire table locks */
|
||||
private static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
|
||||
"hbase.table.write.lock.timeout.ms";
|
||||
/** Configuration key for time out for trying to acquire table locks */
|
||||
private static final String TABLE_READ_LOCK_TIMEOUT_MS =
|
||||
"hbase.table.read.lock.timeout.ms";
|
||||
private static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default
|
||||
private static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default
|
||||
|
||||
private long exclusiveLockTimeoutMs;
|
||||
private long sharedLockTimeoutMs;
|
||||
|
||||
public TableNamespaceManager(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
this.conf = masterServices.getConfiguration();
|
||||
|
||||
this.exclusiveLockTimeoutMs = conf.getLong(
|
||||
TABLE_WRITE_LOCK_TIMEOUT_MS,
|
||||
DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
|
||||
this.sharedLockTimeoutMs = conf.getLong(
|
||||
TABLE_READ_LOCK_TIMEOUT_MS,
|
||||
DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
|
@ -134,30 +111,6 @@ public class TableNamespaceManager {
|
|||
return nsTable;
|
||||
}
|
||||
|
||||
private boolean acquireSharedLock() throws IOException {
|
||||
try {
|
||||
return rwLock.readLock().tryLock(sharedLockTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseSharedLock() {
|
||||
rwLock.readLock().unlock();
|
||||
}
|
||||
|
||||
public boolean acquireExclusiveLock() {
|
||||
try {
|
||||
return rwLock.writeLock().tryLock(exclusiveLockTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseExclusiveLock() {
|
||||
rwLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
/*
|
||||
* check whether a namespace has already existed.
|
||||
*/
|
||||
|
@ -225,13 +178,7 @@ public class TableNamespaceManager {
|
|||
Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
|
||||
ResultScanner scanner =
|
||||
getNamespaceTable().getScanner(HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES);
|
||||
boolean locked = false;
|
||||
try {
|
||||
locked = acquireSharedLock();
|
||||
if (!locked) {
|
||||
throw new IOException(
|
||||
"Fail to acquire lock to scan namespace list. Some namespace DDL is in progress.");
|
||||
}
|
||||
for(Result r : scanner) {
|
||||
byte[] val = CellUtil.cloneValue(r.getColumnLatestCell(
|
||||
HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES,
|
||||
|
@ -241,9 +188,6 @@ public class TableNamespaceManager {
|
|||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
if (locked) {
|
||||
releaseSharedLock();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -200,23 +200,19 @@ public class CreateNamespaceProcedure
|
|||
// Namespace manager might not be ready if master is not fully initialized,
|
||||
// return false to reject user namespace creation; return true for default
|
||||
// and system namespace creation (this is part of master initialization).
|
||||
if (nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
|
||||
nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE)) {
|
||||
return true;
|
||||
}
|
||||
boolean isBootstrapNs = nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
|
||||
nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE);
|
||||
|
||||
if (env.waitInitialized(this)) {
|
||||
if (!isBootstrapNs && env.waitInitialized(this)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return getTableNamespaceManager(env).acquireExclusiveLock();
|
||||
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(final MasterProcedureEnv env) {
|
||||
if (env.getMasterServices().isInitialized()) {
|
||||
getTableNamespaceManager(env).releaseExclusiveLock();
|
||||
}
|
||||
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -229,6 +225,10 @@ public class CreateNamespaceProcedure
|
|||
return TableOperationType.EDIT;
|
||||
}
|
||||
|
||||
private String getNamespaceName() {
|
||||
return nsDescriptor.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Action before any real action of creating namespace.
|
||||
* @param env MasterProcedureEnv
|
||||
|
|
|
@ -104,7 +104,7 @@ public class DeleteNamespaceProcedure
|
|||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error trying to delete the namespace" + namespaceName
|
||||
LOG.warn("Error trying to delete the namespace " + namespaceName
|
||||
+ " (in state=" + state + ")", e);
|
||||
|
||||
setFailure("master-delete-namespace", e);
|
||||
|
@ -212,12 +212,13 @@ public class DeleteNamespaceProcedure
|
|||
|
||||
@Override
|
||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||
return getTableNamespaceManager(env).acquireExclusiveLock();
|
||||
if (env.waitInitialized(this)) return false;
|
||||
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(final MasterProcedureEnv env) {
|
||||
getTableNamespaceManager(env).releaseExclusiveLock();
|
||||
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -230,6 +231,10 @@ public class DeleteNamespaceProcedure
|
|||
return TableOperationType.EDIT;
|
||||
}
|
||||
|
||||
private String getNamespaceName() {
|
||||
return namespaceName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Action before any real action of deleting namespace.
|
||||
* @param env MasterProcedureEnv
|
||||
|
|
|
@ -309,9 +309,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
if (!suspendQueue) suspendQueue = true;
|
||||
|
||||
if (isTableProcedure(procedure)) {
|
||||
suspendTableQueue(event, getTableName(procedure));
|
||||
waitTableEvent(event, procedure, suspendQueue);
|
||||
} else if (isServerProcedure(procedure)) {
|
||||
suspendServerQueue(event, getServerName(procedure));
|
||||
waitServerEvent(event, procedure, suspendQueue);
|
||||
} else {
|
||||
// TODO: at the moment we only have Table and Server procedures
|
||||
// if you are implementing a non-table/non-server procedure, you have two options: create
|
||||
|
@ -324,15 +324,21 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return true;
|
||||
}
|
||||
|
||||
private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
|
||||
private void waitTableEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
|
||||
final TableName tableName = getTableName(procedure);
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
|
||||
schedLock.lock();
|
||||
try {
|
||||
TableQueue queue = getTableQueue(tableName);
|
||||
if (!queue.setSuspended(true)) return;
|
||||
if (queue.isSuspended()) return;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// TODO: if !suspendQueue
|
||||
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Suspend table queue " + tableName);
|
||||
}
|
||||
queue.setSuspended(true);
|
||||
removeFromRunQueue(tableRunQueue, queue);
|
||||
event.suspendTableQueue(queue);
|
||||
} finally {
|
||||
|
@ -340,16 +346,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
}
|
||||
|
||||
private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
|
||||
private void waitServerEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
|
||||
final ServerName serverName = getServerName(procedure);
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
|
||||
schedLock.lock();
|
||||
try {
|
||||
// TODO: This will change once we have the new AM
|
||||
ServerQueue queue = getServerQueue(serverName);
|
||||
if (!queue.setSuspended(true)) return;
|
||||
if (queue.isSuspended()) return;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// TODO: if !suspendQueue
|
||||
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Suspend server queue " + serverName);
|
||||
}
|
||||
queue.setSuspended(true);
|
||||
removeFromRunQueue(serverRunQueue, queue);
|
||||
event.suspendServerQueue(queue);
|
||||
} finally {
|
||||
|
@ -358,18 +370,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
|
||||
public void suspend(ProcedureEvent event) {
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
synchronized (event) {
|
||||
event.setReady(false);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Suspend event " + event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void wake(ProcedureEvent event) {
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
synchronized (event) {
|
||||
event.setReady(true);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Wake event " + event);
|
||||
}
|
||||
|
||||
|
@ -467,7 +481,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
Queue<TableName> node = AvlTree.get(tableMap, tableName);
|
||||
if (node != null) return (TableQueue)node;
|
||||
|
||||
node = new TableQueue(tableName, getTablePriority(tableName));
|
||||
NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString());
|
||||
node = new TableQueue(tableName, nsQueue, getTablePriority(tableName));
|
||||
tableMap = AvlTree.insert(tableMap, node);
|
||||
return (TableQueue)node;
|
||||
}
|
||||
|
@ -493,6 +508,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return ((TableProcedureInterface)proc).getTableName();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Namespace Queue Lookup Helpers
|
||||
// ============================================================================
|
||||
private NamespaceQueue getNamespaceQueue(String namespace) {
|
||||
Queue<String> node = AvlTree.get(namespaceMap, namespace);
|
||||
if (node != null) return (NamespaceQueue)node;
|
||||
|
||||
node = new NamespaceQueue(namespace);
|
||||
namespaceMap = AvlTree.insert(namespaceMap, node);
|
||||
return (NamespaceQueue)node;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Server Queue Lookup Helpers
|
||||
// ============================================================================
|
||||
|
@ -559,10 +586,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
|
||||
public static class TableQueue extends QueueImpl<TableName> {
|
||||
private final NamespaceQueue namespaceQueue;
|
||||
|
||||
private TableLock tableLock = null;
|
||||
|
||||
public TableQueue(TableName tableName, int priority) {
|
||||
public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) {
|
||||
super(tableName, priority);
|
||||
this.namespaceQueue = namespaceQueue;
|
||||
}
|
||||
|
||||
public NamespaceQueue getNamespaceQueue() {
|
||||
return namespaceQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isAvailable() {
|
||||
return super.isAvailable() && !namespaceQueue.hasExclusiveLock();
|
||||
}
|
||||
|
||||
// TODO: We can abort pending/in-progress operation if the new call is
|
||||
|
@ -584,9 +623,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
case CREATE:
|
||||
case DELETE:
|
||||
case DISABLE:
|
||||
case EDIT:
|
||||
case ENABLE:
|
||||
return true;
|
||||
case EDIT:
|
||||
// we allow concurrent edit on the NS table
|
||||
return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
|
||||
case READ:
|
||||
return false;
|
||||
default:
|
||||
|
@ -595,10 +636,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
|
||||
}
|
||||
|
||||
private synchronized boolean trySharedLock(final TableLockManager lockManager,
|
||||
private synchronized boolean tryZkSharedLock(final TableLockManager lockManager,
|
||||
final String purpose) {
|
||||
if (hasExclusiveLock()) return false;
|
||||
|
||||
// Take zk-read-lock
|
||||
TableName tableName = getKey();
|
||||
tableLock = lockManager.readLock(tableName, purpose);
|
||||
|
@ -609,14 +648,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
tableLock = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
trySharedLock();
|
||||
return true;
|
||||
}
|
||||
|
||||
private synchronized void releaseSharedLock(final TableLockManager lockManager) {
|
||||
private synchronized void releaseZkSharedLock(final TableLockManager lockManager) {
|
||||
releaseTableLock(lockManager, isSingleSharedLock());
|
||||
releaseSharedLock();
|
||||
}
|
||||
|
||||
private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
|
||||
|
@ -653,8 +689,44 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* the namespace is currently used just as a rwlock, not as a queue.
|
||||
* because ns operation are not frequent enough. so we want to avoid
|
||||
* having to move table queues around for suspend/resume.
|
||||
*/
|
||||
private static class NamespaceQueue extends Queue<String> {
|
||||
public NamespaceQueue(String namespace) {
|
||||
super(namespace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requireExclusiveLock(Procedure proc) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(final Procedure proc, final boolean addToFront) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Procedure poll() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Locking Helpers
|
||||
// Table Locking Helpers
|
||||
// ============================================================================
|
||||
/**
|
||||
* Try to acquire the exclusive lock on the specified table.
|
||||
|
@ -666,8 +738,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
|
||||
schedLock.lock();
|
||||
TableQueue queue = getTableQueue(table);
|
||||
boolean hasXLock = queue.tryExclusiveLock();
|
||||
if (!hasXLock) {
|
||||
if (!queue.getNamespaceQueue().trySharedLock()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!queue.tryExclusiveLock()) {
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
schedLock.unlock();
|
||||
return false;
|
||||
}
|
||||
|
@ -676,10 +752,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
schedLock.unlock();
|
||||
|
||||
// Zk lock is expensive...
|
||||
hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
|
||||
boolean hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
|
||||
if (!hasXLock) {
|
||||
schedLock.lock();
|
||||
queue.releaseExclusiveLock();
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
schedLock.unlock();
|
||||
}
|
||||
|
@ -700,6 +777,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
|
||||
schedLock.lock();
|
||||
queue.releaseExclusiveLock();
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
schedLock.unlock();
|
||||
}
|
||||
|
@ -712,7 +790,29 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
* @return true if we were able to acquire the lock on the table, otherwise false.
|
||||
*/
|
||||
public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
|
||||
return getTableQueueWithLock(table).trySharedLock(lockManager, purpose);
|
||||
schedLock.lock();
|
||||
TableQueue queue = getTableQueue(table);
|
||||
if (!queue.getNamespaceQueue().trySharedLock()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!queue.trySharedLock()) {
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
schedLock.unlock();
|
||||
return false;
|
||||
}
|
||||
|
||||
schedLock.unlock();
|
||||
|
||||
// Zk lock is expensive...
|
||||
boolean hasXLock = queue.tryZkSharedLock(lockManager, purpose);
|
||||
if (!hasXLock) {
|
||||
schedLock.lock();
|
||||
queue.releaseSharedLock();
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
schedLock.unlock();
|
||||
}
|
||||
return hasXLock;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -720,7 +820,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
* @param table the name of the table that has the shared lock
|
||||
*/
|
||||
public void releaseTableSharedLock(final TableName table) {
|
||||
getTableQueueWithLock(table).releaseSharedLock(lockManager);
|
||||
schedLock.lock();
|
||||
TableQueue queue = getTableQueue(table);
|
||||
schedLock.unlock();
|
||||
|
||||
// Zk lock is expensive...
|
||||
queue.releaseZkSharedLock(lockManager);
|
||||
|
||||
schedLock.lock();
|
||||
queue.releaseSharedLock();
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
schedLock.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -762,13 +872,58 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return true;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Namespace Locking Helpers
|
||||
// ============================================================================
|
||||
/**
|
||||
* Try to acquire the exclusive lock on the specified namespace.
|
||||
* @see #releaseNamespaceExclusiveLock(String)
|
||||
* @param nsName Namespace to lock
|
||||
* @return true if we were able to acquire the lock on the namespace, otherwise false.
|
||||
*/
|
||||
public boolean tryAcquireNamespaceExclusiveLock(final String nsName) {
|
||||
schedLock.lock();
|
||||
try {
|
||||
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||
if (!tableQueue.trySharedLock()) return false;
|
||||
|
||||
NamespaceQueue nsQueue = getNamespaceQueue(nsName);
|
||||
boolean hasLock = nsQueue.tryExclusiveLock();
|
||||
if (!hasLock) {
|
||||
tableQueue.releaseSharedLock();
|
||||
}
|
||||
return hasLock;
|
||||
} finally {
|
||||
schedLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the exclusive lock
|
||||
* @see #tryAcquireNamespaceExclusiveLock(String)
|
||||
* @param nsName the namespace that has the exclusive lock
|
||||
*/
|
||||
public void releaseNamespaceExclusiveLock(final String nsName) {
|
||||
schedLock.lock();
|
||||
try {
|
||||
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||
tableQueue.releaseSharedLock();
|
||||
|
||||
NamespaceQueue queue = getNamespaceQueue(nsName);
|
||||
queue.releaseExclusiveLock();
|
||||
} finally {
|
||||
schedLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Server Locking Helpers
|
||||
// ============================================================================
|
||||
/**
|
||||
* Release the exclusive lock
|
||||
* @see #tryAcquireServerExclusiveLock(ServerName)
|
||||
* @param serverName the server that has the exclusive lock
|
||||
* Try to acquire the exclusive lock on the specified server.
|
||||
* @see #releaseServerExclusiveLock(ServerName)
|
||||
* @param serverName Server to lock
|
||||
* @return true if we were able to acquire the lock on the server, otherwise false.
|
||||
*/
|
||||
public boolean tryAcquireServerExclusiveLock(final ServerName serverName) {
|
||||
schedLock.lock();
|
||||
|
|
|
@ -192,12 +192,13 @@ public class ModifyNamespaceProcedure
|
|||
|
||||
@Override
|
||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||
return getTableNamespaceManager(env).acquireExclusiveLock();
|
||||
if (env.waitInitialized(this)) return false;
|
||||
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(final MasterProcedureEnv env) {
|
||||
getTableNamespaceManager(env).releaseExclusiveLock();
|
||||
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -210,6 +211,10 @@ public class ModifyNamespaceProcedure
|
|||
return TableOperationType.EDIT;
|
||||
}
|
||||
|
||||
private String getNamespaceName() {
|
||||
return newNsDescriptor.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Action before any real action of adding namespace.
|
||||
* @param env MasterProcedureEnv
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.After;
|
||||
|
@ -289,6 +290,58 @@ public class TestMasterProcedureScheduler {
|
|||
assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyNamespaceRwLocks() throws Exception {
|
||||
String nsName1 = "ns1";
|
||||
String nsName2 = "ns2";
|
||||
TableName tableName1 = TableName.valueOf(nsName1, "testtb");
|
||||
TableName tableName2 = TableName.valueOf(nsName2, "testtb");
|
||||
queue.addBack(new TestNamespaceProcedure(1, nsName1,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(2, tableName1,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(3, tableName2,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestNamespaceProcedure(4, nsName2,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// Fetch the 1st item and take the write lock
|
||||
long procId = queue.poll().getProcId();
|
||||
assertEquals(1, procId);
|
||||
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName1));
|
||||
|
||||
// System tables have 2 as default priority
|
||||
Procedure proc = queue.poll();
|
||||
assertEquals(4, proc.getProcId());
|
||||
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName2));
|
||||
queue.releaseNamespaceExclusiveLock(nsName2);
|
||||
queue.yield(proc);
|
||||
|
||||
// table on ns1 is locked, so we get table on ns2
|
||||
procId = queue.poll().getProcId();
|
||||
assertEquals(3, procId);
|
||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName2, "lock " + procId));
|
||||
|
||||
// ns2 is not available (TODO we may avoid this one)
|
||||
proc = queue.poll();
|
||||
assertEquals(4, proc.getProcId());
|
||||
assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(nsName2));
|
||||
queue.yield(proc);
|
||||
|
||||
// release the ns1 lock
|
||||
queue.releaseNamespaceExclusiveLock(nsName1);
|
||||
|
||||
// we are now able to execute table of ns1
|
||||
procId = queue.poll().getProcId();
|
||||
assertEquals(2, procId);
|
||||
|
||||
queue.releaseTableExclusiveLock(tableName2);
|
||||
|
||||
// we are now able to execute ns2
|
||||
procId = queue.poll().getProcId();
|
||||
assertEquals(4, procId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that "write" operations for a single table are serialized,
|
||||
* but different tables can be executed in parallel.
|
||||
|
@ -438,7 +491,7 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TestTableProcedure extends Procedure<Void>
|
||||
public static class TestTableProcedure extends TestProcedure
|
||||
implements TableProcedureInterface {
|
||||
private final TableOperationType opType;
|
||||
private final TableName tableName;
|
||||
|
@ -448,9 +501,9 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
|
||||
public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
|
||||
super(procId);
|
||||
this.tableName = tableName;
|
||||
this.opType = opType;
|
||||
setProcId(procId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -462,26 +515,31 @@ public class TestMasterProcedureScheduler {
|
|||
public TableOperationType getTableOperationType() {
|
||||
return opType;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(Void env) {
|
||||
return null;
|
||||
public static class TestNamespaceProcedure extends TestProcedure
|
||||
implements TableProcedureInterface {
|
||||
private final TableOperationType opType;
|
||||
private final String nsName;
|
||||
|
||||
public TestNamespaceProcedure() {
|
||||
throw new UnsupportedOperationException("recovery should not be triggered here");
|
||||
}
|
||||
|
||||
public TestNamespaceProcedure(long procId, String nsName, TableOperationType opType) {
|
||||
super(procId);
|
||||
this.nsName = nsName;
|
||||
this.opType = opType;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(Void env) {
|
||||
throw new UnsupportedOperationException();
|
||||
public TableName getTableName() {
|
||||
return TableName.NAMESPACE_TABLE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(Void env) {
|
||||
throw new UnsupportedOperationException();
|
||||
public TableOperationType getTableOperationType() {
|
||||
return opType;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(final OutputStream stream) throws IOException {}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(final InputStream stream) throws IOException {}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue