HBASE-15106 Procedure v2 - Procedure Queue pass Procedure for better debuggability
This commit is contained in:
parent
eb17f74b9e
commit
713c6b5b1e
|
@ -185,12 +185,12 @@ public class AddColumnFamilyProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
|
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -207,12 +207,12 @@ public class CreateNamespaceProcedure
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
|
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
|
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -269,12 +269,12 @@ public class CreateTableProcedure
|
||||||
if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
|
if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
|
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {
|
private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {
|
||||||
|
|
|
@ -202,12 +202,12 @@ public class DeleteColumnFamilyProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
|
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -213,12 +213,12 @@ public class DeleteNamespaceProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
|
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
|
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -201,12 +201,12 @@ public class DeleteTableProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
|
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -215,12 +215,12 @@ public class DisableTableProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
|
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -235,12 +235,12 @@ public class EnableTableProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
|
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -709,6 +709,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Procedure peek() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Procedure poll() {
|
public Procedure poll() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
@ -731,18 +736,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
/**
|
/**
|
||||||
* Try to acquire the exclusive lock on the specified table.
|
* Try to acquire the exclusive lock on the specified table.
|
||||||
* other operations in the table-queue will be executed after the lock is released.
|
* other operations in the table-queue will be executed after the lock is released.
|
||||||
|
* @param procedure the procedure trying to acquire the lock
|
||||||
* @param table Table to lock
|
* @param table Table to lock
|
||||||
* @param purpose Human readable reason for locking the table
|
|
||||||
* @return true if we were able to acquire the lock on the table, otherwise false.
|
* @return true if we were able to acquire the lock on the table, otherwise false.
|
||||||
*/
|
*/
|
||||||
public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
|
public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
TableQueue queue = getTableQueue(table);
|
TableQueue queue = getTableQueue(table);
|
||||||
if (!queue.getNamespaceQueue().trySharedLock()) {
|
if (!queue.getNamespaceQueue().trySharedLock()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!queue.tryExclusiveLock()) {
|
if (!queue.tryExclusiveLock(procedure.getProcId())) {
|
||||||
queue.getNamespaceQueue().releaseSharedLock();
|
queue.getNamespaceQueue().releaseSharedLock();
|
||||||
schedLock.unlock();
|
schedLock.unlock();
|
||||||
return false;
|
return false;
|
||||||
|
@ -752,7 +757,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
schedLock.unlock();
|
schedLock.unlock();
|
||||||
|
|
||||||
// Zk lock is expensive...
|
// Zk lock is expensive...
|
||||||
boolean hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
|
boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
|
||||||
if (!hasXLock) {
|
if (!hasXLock) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
queue.releaseExclusiveLock();
|
queue.releaseExclusiveLock();
|
||||||
|
@ -765,9 +770,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release the exclusive lock taken with tryAcquireTableWrite()
|
* Release the exclusive lock taken with tryAcquireTableWrite()
|
||||||
|
* @param procedure the procedure releasing the lock
|
||||||
* @param table the name of the table that has the exclusive lock
|
* @param table the name of the table that has the exclusive lock
|
||||||
*/
|
*/
|
||||||
public void releaseTableExclusiveLock(final TableName table) {
|
public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
TableQueue queue = getTableQueue(table);
|
TableQueue queue = getTableQueue(table);
|
||||||
schedLock.unlock();
|
schedLock.unlock();
|
||||||
|
@ -785,44 +791,48 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
/**
|
/**
|
||||||
* Try to acquire the shared lock on the specified table.
|
* Try to acquire the shared lock on the specified table.
|
||||||
* other "read" operations in the table-queue may be executed concurrently,
|
* other "read" operations in the table-queue may be executed concurrently,
|
||||||
|
* @param procedure the procedure trying to acquire the lock
|
||||||
* @param table Table to lock
|
* @param table Table to lock
|
||||||
* @param purpose Human readable reason for locking the table
|
|
||||||
* @return true if we were able to acquire the lock on the table, otherwise false.
|
* @return true if we were able to acquire the lock on the table, otherwise false.
|
||||||
*/
|
*/
|
||||||
public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
|
public boolean tryAcquireTableSharedLock(final Procedure procedure, final TableName table) {
|
||||||
|
return tryAcquireTableQueueSharedLock(procedure, table) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure,
|
||||||
|
final TableName table) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
TableQueue queue = getTableQueue(table);
|
TableQueue queue = getTableQueue(table);
|
||||||
if (!queue.getNamespaceQueue().trySharedLock()) {
|
if (!queue.getNamespaceQueue().trySharedLock()) {
|
||||||
return false;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!queue.trySharedLock()) {
|
if (!queue.trySharedLock()) {
|
||||||
queue.getNamespaceQueue().releaseSharedLock();
|
queue.getNamespaceQueue().releaseSharedLock();
|
||||||
schedLock.unlock();
|
schedLock.unlock();
|
||||||
return false;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
schedLock.unlock();
|
schedLock.unlock();
|
||||||
|
|
||||||
// Zk lock is expensive...
|
// Zk lock is expensive...
|
||||||
boolean hasXLock = queue.tryZkSharedLock(lockManager, purpose);
|
if (!queue.tryZkSharedLock(lockManager, procedure.toString())) {
|
||||||
if (!hasXLock) {
|
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
queue.releaseSharedLock();
|
queue.releaseSharedLock();
|
||||||
queue.getNamespaceQueue().releaseSharedLock();
|
queue.getNamespaceQueue().releaseSharedLock();
|
||||||
schedLock.unlock();
|
schedLock.unlock();
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
return hasXLock;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release the shared lock taken with tryAcquireTableRead()
|
* Release the shared lock taken with tryAcquireTableRead()
|
||||||
|
* @param procedure the procedure releasing the lock
|
||||||
* @param table the name of the table that has the shared lock
|
* @param table the name of the table that has the shared lock
|
||||||
*/
|
*/
|
||||||
public void releaseTableSharedLock(final TableName table) {
|
public void releaseTableSharedLock(final Procedure procedure, final TableName table) {
|
||||||
schedLock.lock();
|
final TableQueue queue = getTableQueueWithLock(table);
|
||||||
TableQueue queue = getTableQueue(table);
|
|
||||||
schedLock.unlock();
|
|
||||||
|
|
||||||
// Zk lock is expensive...
|
// Zk lock is expensive...
|
||||||
queue.releaseZkSharedLock(lockManager);
|
queue.releaseZkSharedLock(lockManager);
|
||||||
|
@ -848,7 +858,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
TableQueue queue = getTableQueue(table);
|
TableQueue queue = getTableQueue(table);
|
||||||
if (queue == null) return true;
|
if (queue == null) return true;
|
||||||
|
|
||||||
if (queue.isEmpty() && queue.acquireDeleteLock()) {
|
if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
|
||||||
// remove the table from the run-queue and the map
|
// remove the table from the run-queue and the map
|
||||||
if (IterableList.isLinked(queue)) {
|
if (IterableList.isLinked(queue)) {
|
||||||
tableRunQueue.remove(queue);
|
tableRunQueue.remove(queue);
|
||||||
|
@ -877,18 +887,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
/**
|
/**
|
||||||
* Try to acquire the exclusive lock on the specified namespace.
|
* Try to acquire the exclusive lock on the specified namespace.
|
||||||
* @see #releaseNamespaceExclusiveLock(String)
|
* @see #releaseNamespaceExclusiveLock(Procedure,String)
|
||||||
|
* @param procedure the procedure trying to acquire the lock
|
||||||
* @param nsName Namespace to lock
|
* @param nsName Namespace to lock
|
||||||
* @return true if we were able to acquire the lock on the namespace, otherwise false.
|
* @return true if we were able to acquire the lock on the namespace, otherwise false.
|
||||||
*/
|
*/
|
||||||
public boolean tryAcquireNamespaceExclusiveLock(final String nsName) {
|
public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
try {
|
try {
|
||||||
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||||
if (!tableQueue.trySharedLock()) return false;
|
if (!tableQueue.trySharedLock()) return false;
|
||||||
|
|
||||||
NamespaceQueue nsQueue = getNamespaceQueue(nsName);
|
NamespaceQueue nsQueue = getNamespaceQueue(nsName);
|
||||||
boolean hasLock = nsQueue.tryExclusiveLock();
|
boolean hasLock = nsQueue.tryExclusiveLock(procedure.getProcId());
|
||||||
if (!hasLock) {
|
if (!hasLock) {
|
||||||
tableQueue.releaseSharedLock();
|
tableQueue.releaseSharedLock();
|
||||||
}
|
}
|
||||||
|
@ -900,10 +911,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release the exclusive lock
|
* Release the exclusive lock
|
||||||
* @see #tryAcquireNamespaceExclusiveLock(String)
|
* @see #tryAcquireNamespaceExclusiveLock(Procedure,String)
|
||||||
|
* @param procedure the procedure releasing the lock
|
||||||
* @param nsName the namespace that has the exclusive lock
|
* @param nsName the namespace that has the exclusive lock
|
||||||
*/
|
*/
|
||||||
public void releaseNamespaceExclusiveLock(final String nsName) {
|
public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
try {
|
try {
|
||||||
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||||
|
@ -921,15 +933,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
/**
|
/**
|
||||||
* Try to acquire the exclusive lock on the specified server.
|
* Try to acquire the exclusive lock on the specified server.
|
||||||
* @see #releaseServerExclusiveLock(ServerName)
|
* @see #releaseServerExclusiveLock(Procedure,ServerName)
|
||||||
|
* @param procedure the procedure trying to acquire the lock
|
||||||
* @param serverName Server to lock
|
* @param serverName Server to lock
|
||||||
* @return true if we were able to acquire the lock on the server, otherwise false.
|
* @return true if we were able to acquire the lock on the server, otherwise false.
|
||||||
*/
|
*/
|
||||||
public boolean tryAcquireServerExclusiveLock(final ServerName serverName) {
|
public boolean tryAcquireServerExclusiveLock(final Procedure procedure,
|
||||||
|
final ServerName serverName) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
try {
|
try {
|
||||||
ServerQueue queue = getServerQueue(serverName);
|
ServerQueue queue = getServerQueue(serverName);
|
||||||
if (queue.tryExclusiveLock()) {
|
if (queue.tryExclusiveLock(procedure.getProcId())) {
|
||||||
removeFromRunQueue(serverRunQueue, queue);
|
removeFromRunQueue(serverRunQueue, queue);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -941,10 +955,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release the exclusive lock
|
* Release the exclusive lock
|
||||||
* @see #tryAcquireServerExclusiveLock(ServerName)
|
* @see #tryAcquireServerExclusiveLock(Procedure,ServerName)
|
||||||
|
* @param procedure the procedure releasing the lock
|
||||||
* @param serverName the server that has the exclusive lock
|
* @param serverName the server that has the exclusive lock
|
||||||
*/
|
*/
|
||||||
public void releaseServerExclusiveLock(final ServerName serverName) {
|
public void releaseServerExclusiveLock(final Procedure procedure,
|
||||||
|
final ServerName serverName) {
|
||||||
schedLock.lock();
|
schedLock.lock();
|
||||||
try {
|
try {
|
||||||
ServerQueue queue = getServerQueue(serverName);
|
ServerQueue queue = getServerQueue(serverName);
|
||||||
|
@ -957,20 +973,24 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to acquire the shared lock on the specified server.
|
* Try to acquire the shared lock on the specified server.
|
||||||
* @see #releaseServerSharedLock(ServerName)
|
* @see #releaseServerSharedLock(Procedure,ServerName)
|
||||||
|
* @param procedure the procedure releasing the lock
|
||||||
* @param serverName Server to lock
|
* @param serverName Server to lock
|
||||||
* @return true if we were able to acquire the lock on the server, otherwise false.
|
* @return true if we were able to acquire the lock on the server, otherwise false.
|
||||||
*/
|
*/
|
||||||
public boolean tryAcquireServerSharedLock(final ServerName serverName) {
|
public boolean tryAcquireServerSharedLock(final Procedure procedure,
|
||||||
|
final ServerName serverName) {
|
||||||
return getServerQueueWithLock(serverName).trySharedLock();
|
return getServerQueueWithLock(serverName).trySharedLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release the shared lock taken
|
* Release the shared lock taken
|
||||||
* @see #tryAcquireServerSharedLock(ServerName)
|
* @see #tryAcquireServerSharedLock(Procedure,ServerName)
|
||||||
|
* @param procedure the procedure releasing the lock
|
||||||
* @param serverName the server that has the shared lock
|
* @param serverName the server that has the shared lock
|
||||||
*/
|
*/
|
||||||
public void releaseServerSharedLock(final ServerName serverName) {
|
public void releaseServerSharedLock(final Procedure procedure,
|
||||||
|
final ServerName serverName) {
|
||||||
getServerQueueWithLock(serverName).releaseSharedLock();
|
getServerQueueWithLock(serverName).releaseSharedLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -981,8 +1001,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
boolean isAvailable();
|
boolean isAvailable();
|
||||||
boolean isEmpty();
|
boolean isEmpty();
|
||||||
int size();
|
int size();
|
||||||
|
|
||||||
void add(Procedure proc, boolean addFront);
|
void add(Procedure proc, boolean addFront);
|
||||||
boolean requireExclusiveLock(Procedure proc);
|
boolean requireExclusiveLock(Procedure proc);
|
||||||
|
Procedure peek();
|
||||||
Procedure poll();
|
Procedure poll();
|
||||||
|
|
||||||
boolean isSuspended();
|
boolean isSuspended();
|
||||||
|
@ -997,7 +1019,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
private Queue<TKey> iterPrev = null;
|
private Queue<TKey> iterPrev = null;
|
||||||
private boolean suspended = false;
|
private boolean suspended = false;
|
||||||
|
|
||||||
private boolean exclusiveLock = false;
|
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
|
||||||
private int sharedLock = 0;
|
private int sharedLock = 0;
|
||||||
|
|
||||||
private final TKey key;
|
private final TKey key;
|
||||||
|
@ -1041,7 +1063,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean hasExclusiveLock() {
|
public synchronized boolean hasExclusiveLock() {
|
||||||
return this.exclusiveLock;
|
return this.exclusiveLockProcIdOwner != Long.MIN_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean trySharedLock() {
|
public synchronized boolean trySharedLock() {
|
||||||
|
@ -1058,24 +1080,21 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
return sharedLock == 1;
|
return sharedLock == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean tryExclusiveLock() {
|
public synchronized boolean tryExclusiveLock(long procIdOwner) {
|
||||||
|
assert procIdOwner != Long.MIN_VALUE;
|
||||||
if (isLocked()) return false;
|
if (isLocked()) return false;
|
||||||
exclusiveLock = true;
|
exclusiveLockProcIdOwner = procIdOwner;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void releaseExclusiveLock() {
|
public synchronized void releaseExclusiveLock() {
|
||||||
exclusiveLock = false;
|
exclusiveLockProcIdOwner = Long.MIN_VALUE;
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean acquireDeleteLock() {
|
|
||||||
return tryExclusiveLock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This should go away when we have the new AM and its events
|
// This should go away when we have the new AM and its events
|
||||||
// and we move xlock to the lock-event-queue.
|
// and we move xlock to the lock-event-queue.
|
||||||
public synchronized boolean isAvailable() {
|
public synchronized boolean isAvailable() {
|
||||||
return !exclusiveLock && !isEmpty();
|
return !hasExclusiveLock() && !isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ======================================================================
|
// ======================================================================
|
||||||
|
@ -1125,6 +1144,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
||||||
runnables.addLast(proc);
|
runnables.addLast(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Procedure peek() {
|
||||||
|
return runnables.peek();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Procedure poll() {
|
public Procedure poll() {
|
||||||
return runnables.poll();
|
return runnables.poll();
|
||||||
|
|
|
@ -182,12 +182,12 @@ public class ModifyColumnFamilyProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(tableName);
|
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -193,12 +193,12 @@ public class ModifyNamespaceProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
|
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
|
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -215,12 +215,12 @@ public class ModifyTableProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
|
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -554,12 +554,12 @@ implements ServerProcedureInterface {
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitServerCrashProcessingEnabled(this)) return false;
|
if (env.waitServerCrashProcessingEnabled(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName());
|
return env.getProcedureQueue().tryAcquireServerExclusiveLock(this, getServerName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseServerExclusiveLock(getServerName());
|
env.getProcedureQueue().releaseServerExclusiveLock(this, getServerName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -183,12 +183,12 @@ public class TruncateTableProcedure
|
||||||
@Override
|
@Override
|
||||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
if (env.waitInitialized(this)) return false;
|
if (env.waitInitialized(this)) return false;
|
||||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
|
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
env.getProcedureQueue().releaseTableExclusiveLock(getTableName());
|
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.master.procedure;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -78,9 +79,11 @@ public class TestMasterProcedureScheduler {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
TestTableProcedure proc = new TestTableProcedure(1, table,
|
||||||
|
TableProcedureInterface.TableOperationType.CREATE);
|
||||||
while (running.get() && !failure.get()) {
|
while (running.get() && !failure.get()) {
|
||||||
if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
|
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
|
||||||
procQueue.releaseTableExclusiveLock(table);
|
procQueue.releaseTableExclusiveLock(proc, table);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -94,9 +97,11 @@ public class TestMasterProcedureScheduler {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
TestTableProcedure proc = new TestTableProcedure(2, table,
|
||||||
|
TableProcedureInterface.TableOperationType.DELETE);
|
||||||
while (running.get() && !failure.get()) {
|
while (running.get() && !failure.get()) {
|
||||||
if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
|
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
|
||||||
procQueue.releaseTableExclusiveLock(table);
|
procQueue.releaseTableExclusiveLock(proc, table);
|
||||||
}
|
}
|
||||||
procQueue.markTableAsDeleted(table);
|
procQueue.markTableAsDeleted(table);
|
||||||
}
|
}
|
||||||
|
@ -143,8 +148,8 @@ public class TestMasterProcedureScheduler {
|
||||||
Procedure proc = queue.poll();
|
Procedure proc = queue.poll();
|
||||||
assertTrue(proc != null);
|
assertTrue(proc != null);
|
||||||
TableName tableName = ((TestTableProcedure)proc).getTableName();
|
TableName tableName = ((TestTableProcedure)proc).getTableName();
|
||||||
queue.tryAcquireTableExclusiveLock(tableName, "test");
|
queue.tryAcquireTableExclusiveLock(proc, tableName);
|
||||||
queue.releaseTableExclusiveLock(tableName);
|
queue.releaseTableExclusiveLock(proc, tableName);
|
||||||
queue.completionCleanup(proc);
|
queue.completionCleanup(proc);
|
||||||
assertEquals(--count, queue.size());
|
assertEquals(--count, queue.size());
|
||||||
assertEquals(i * 1000 + j, proc.getProcId());
|
assertEquals(i * 1000 + j, proc.getProcId());
|
||||||
|
@ -174,14 +179,15 @@ public class TestMasterProcedureScheduler {
|
||||||
assertFalse(queue.markTableAsDeleted(tableName));
|
assertFalse(queue.markTableAsDeleted(tableName));
|
||||||
|
|
||||||
// fetch item and take a lock
|
// fetch item and take a lock
|
||||||
assertEquals(1, queue.poll().getProcId());
|
Procedure proc = queue.poll();
|
||||||
|
assertEquals(1, proc.getProcId());
|
||||||
// take the xlock
|
// take the xlock
|
||||||
assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
|
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
|
||||||
// table can't be deleted because we have the lock
|
// table can't be deleted because we have the lock
|
||||||
assertEquals(0, queue.size());
|
assertEquals(0, queue.size());
|
||||||
assertFalse(queue.markTableAsDeleted(tableName));
|
assertFalse(queue.markTableAsDeleted(tableName));
|
||||||
// release the xlock
|
// release the xlock
|
||||||
queue.releaseTableExclusiveLock(tableName);
|
queue.releaseTableExclusiveLock(proc, tableName);
|
||||||
// complete the table deletion
|
// complete the table deletion
|
||||||
assertTrue(queue.markTableAsDeleted(tableName));
|
assertTrue(queue.markTableAsDeleted(tableName));
|
||||||
}
|
}
|
||||||
|
@ -203,20 +209,22 @@ public class TestMasterProcedureScheduler {
|
||||||
// table can't be deleted because one item is in the queue
|
// table can't be deleted because one item is in the queue
|
||||||
assertFalse(queue.markTableAsDeleted(tableName));
|
assertFalse(queue.markTableAsDeleted(tableName));
|
||||||
|
|
||||||
for (int i = 1; i <= nitems; ++i) {
|
Procedure[] procs = new Procedure[nitems];
|
||||||
|
for (int i = 0; i < nitems; ++i) {
|
||||||
// fetch item and take a lock
|
// fetch item and take a lock
|
||||||
assertEquals(i, queue.poll().getProcId());
|
Procedure proc = procs[i] = queue.poll();
|
||||||
|
assertEquals(i + 1, proc.getProcId());
|
||||||
// take the rlock
|
// take the rlock
|
||||||
assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
|
assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
|
||||||
// table can't be deleted because we have locks and/or items in the queue
|
// table can't be deleted because we have locks and/or items in the queue
|
||||||
assertFalse(queue.markTableAsDeleted(tableName));
|
assertFalse(queue.markTableAsDeleted(tableName));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 1; i <= nitems; ++i) {
|
for (int i = 0; i < nitems; ++i) {
|
||||||
// table can't be deleted because we have locks
|
// table can't be deleted because we have locks
|
||||||
assertFalse(queue.markTableAsDeleted(tableName));
|
assertFalse(queue.markTableAsDeleted(tableName));
|
||||||
// release the rlock
|
// release the rlock
|
||||||
queue.releaseTableSharedLock(tableName);
|
queue.releaseTableSharedLock(procs[i], tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// there are no items and no lock in the queeu
|
// there are no items and no lock in the queeu
|
||||||
|
@ -243,49 +251,49 @@ public class TestMasterProcedureScheduler {
|
||||||
TableProcedureInterface.TableOperationType.READ));
|
TableProcedureInterface.TableOperationType.READ));
|
||||||
|
|
||||||
// Fetch the 1st item and take the write lock
|
// Fetch the 1st item and take the write lock
|
||||||
long procId = queue.poll().getProcId();
|
Procedure proc = queue.poll();
|
||||||
assertEquals(1, procId);
|
assertEquals(1, proc.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
|
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
|
||||||
|
|
||||||
// Fetch the 2nd item and verify that the lock can't be acquired
|
// Fetch the 2nd item and verify that the lock can't be acquired
|
||||||
assertEquals(null, queue.poll(0));
|
assertEquals(null, queue.poll(0));
|
||||||
|
|
||||||
// Release the write lock and acquire the read lock
|
// Release the write lock and acquire the read lock
|
||||||
queue.releaseTableExclusiveLock(tableName);
|
queue.releaseTableExclusiveLock(proc, tableName);
|
||||||
|
|
||||||
// Fetch the 2nd item and take the read lock
|
// Fetch the 2nd item and take the read lock
|
||||||
procId = queue.poll().getProcId();
|
Procedure rdProc = queue.poll();
|
||||||
assertEquals(2, procId);
|
assertEquals(2, rdProc.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
|
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
|
||||||
|
|
||||||
// Fetch the 3rd item and verify that the lock can't be acquired
|
// Fetch the 3rd item and verify that the lock can't be acquired
|
||||||
procId = queue.poll().getProcId();
|
Procedure wrProc = queue.poll();
|
||||||
assertEquals(3, procId);
|
assertEquals(3, wrProc.getProcId());
|
||||||
assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
|
assertEquals(false, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
|
||||||
|
|
||||||
// release the rdlock of item 2 and take the wrlock for the 3d item
|
// release the rdlock of item 2 and take the wrlock for the 3d item
|
||||||
queue.releaseTableSharedLock(tableName);
|
queue.releaseTableSharedLock(rdProc, tableName);
|
||||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
|
assertEquals(true, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
|
||||||
|
|
||||||
// Fetch 4th item and verify that the lock can't be acquired
|
// Fetch 4th item and verify that the lock can't be acquired
|
||||||
assertEquals(null, queue.poll(0));
|
assertEquals(null, queue.poll(0));
|
||||||
|
|
||||||
// Release the write lock and acquire the read lock
|
// Release the write lock and acquire the read lock
|
||||||
queue.releaseTableExclusiveLock(tableName);
|
queue.releaseTableExclusiveLock(wrProc, tableName);
|
||||||
|
|
||||||
// Fetch the 4th item and take the read lock
|
// Fetch the 4th item and take the read lock
|
||||||
procId = queue.poll().getProcId();
|
rdProc = queue.poll();
|
||||||
assertEquals(4, procId);
|
assertEquals(4, rdProc.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
|
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
|
||||||
|
|
||||||
// Fetch the 4th item and take the read lock
|
// Fetch the 4th item and take the read lock
|
||||||
procId = queue.poll().getProcId();
|
Procedure rdProc2 = queue.poll();
|
||||||
assertEquals(5, procId);
|
assertEquals(5, rdProc2.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
|
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc2, tableName));
|
||||||
|
|
||||||
// Release 4th and 5th read-lock
|
// Release 4th and 5th read-lock
|
||||||
queue.releaseTableSharedLock(tableName);
|
queue.releaseTableSharedLock(rdProc, tableName);
|
||||||
queue.releaseTableSharedLock(tableName);
|
queue.releaseTableSharedLock(rdProc2, tableName);
|
||||||
|
|
||||||
// remove table queue
|
// remove table queue
|
||||||
assertEquals(0, queue.size());
|
assertEquals(0, queue.size());
|
||||||
|
@ -308,36 +316,36 @@ public class TestMasterProcedureScheduler {
|
||||||
TableProcedureInterface.TableOperationType.EDIT));
|
TableProcedureInterface.TableOperationType.EDIT));
|
||||||
|
|
||||||
// Fetch the 1st item and take the write lock
|
// Fetch the 1st item and take the write lock
|
||||||
long procId = queue.poll().getProcId();
|
Procedure procNs1 = queue.poll();
|
||||||
assertEquals(1, procId);
|
assertEquals(1, procNs1.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName1));
|
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(procNs1, nsName1));
|
||||||
|
|
||||||
// System tables have 2 as default priority
|
// System tables have 2 as default priority
|
||||||
Procedure proc = queue.poll();
|
Procedure procNs2 = queue.poll();
|
||||||
assertEquals(4, proc.getProcId());
|
assertEquals(4, procNs2.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName2));
|
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(procNs2, nsName2));
|
||||||
queue.releaseNamespaceExclusiveLock(nsName2);
|
queue.releaseNamespaceExclusiveLock(procNs2, nsName2);
|
||||||
queue.yield(proc);
|
queue.yield(procNs2);
|
||||||
|
|
||||||
// table on ns1 is locked, so we get table on ns2
|
// table on ns1 is locked, so we get table on ns2
|
||||||
procId = queue.poll().getProcId();
|
procNs2 = queue.poll();
|
||||||
assertEquals(3, procId);
|
assertEquals(3, procNs2.getProcId());
|
||||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName2, "lock " + procId));
|
assertEquals(true, queue.tryAcquireTableExclusiveLock(procNs2, tableName2));
|
||||||
|
|
||||||
// ns2 is not available (TODO we may avoid this one)
|
// ns2 is not available (TODO we may avoid this one)
|
||||||
proc = queue.poll();
|
Procedure procNs2b = queue.poll();
|
||||||
assertEquals(4, proc.getProcId());
|
assertEquals(4, procNs2b.getProcId());
|
||||||
assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(nsName2));
|
assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(procNs2b, nsName2));
|
||||||
queue.yield(proc);
|
queue.yield(procNs2b);
|
||||||
|
|
||||||
// release the ns1 lock
|
// release the ns1 lock
|
||||||
queue.releaseNamespaceExclusiveLock(nsName1);
|
queue.releaseNamespaceExclusiveLock(procNs1, nsName1);
|
||||||
|
|
||||||
// we are now able to execute table of ns1
|
// we are now able to execute table of ns1
|
||||||
procId = queue.poll().getProcId();
|
long procId = queue.poll().getProcId();
|
||||||
assertEquals(2, procId);
|
assertEquals(2, procId);
|
||||||
|
|
||||||
queue.releaseTableExclusiveLock(tableName2);
|
queue.releaseTableExclusiveLock(procNs2, tableName2);
|
||||||
|
|
||||||
// we are now able to execute ns2
|
// we are now able to execute ns2
|
||||||
procId = queue.poll().getProcId();
|
procId = queue.poll().getProcId();
|
||||||
|
@ -375,7 +383,7 @@ public class TestMasterProcedureScheduler {
|
||||||
public void run() {
|
public void run() {
|
||||||
while (opsCount.get() > 0) {
|
while (opsCount.get() > 0) {
|
||||||
try {
|
try {
|
||||||
TableProcedureInterface proc = procSet.acquire();
|
Procedure proc = procSet.acquire();
|
||||||
if (proc == null) {
|
if (proc == null) {
|
||||||
queue.signalAll();
|
queue.signalAll();
|
||||||
if (opsCount.get() > 0) {
|
if (opsCount.get() > 0) {
|
||||||
|
@ -383,14 +391,14 @@ public class TestMasterProcedureScheduler {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TableName tableId = procSet.getTableName(proc);
|
||||||
synchronized (concurrentTables) {
|
synchronized (concurrentTables) {
|
||||||
assertTrue("unexpected concurrency on " + proc.getTableName(),
|
assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
|
||||||
concurrentTables.add(proc.getTableName()));
|
|
||||||
}
|
}
|
||||||
assertTrue(opsCount.decrementAndGet() >= 0);
|
assertTrue(opsCount.decrementAndGet() >= 0);
|
||||||
try {
|
try {
|
||||||
long procId = ((Procedure)proc).getProcId();
|
long procId = proc.getProcId();
|
||||||
TableName tableId = proc.getTableName();
|
|
||||||
int concurrent = concurrentCount.incrementAndGet();
|
int concurrent = concurrentCount.incrementAndGet();
|
||||||
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
|
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
|
||||||
concurrent >= 1 && concurrent <= NUM_TABLES);
|
concurrent >= 1 && concurrent <= NUM_TABLES);
|
||||||
|
@ -401,7 +409,7 @@ public class TestMasterProcedureScheduler {
|
||||||
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
|
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (concurrentTables) {
|
synchronized (concurrentTables) {
|
||||||
assertTrue(concurrentTables.remove(proc.getTableName()));
|
assertTrue(concurrentTables.remove(tableId));
|
||||||
}
|
}
|
||||||
procSet.release(proc);
|
procSet.release(proc);
|
||||||
}
|
}
|
||||||
|
@ -433,43 +441,36 @@ public class TestMasterProcedureScheduler {
|
||||||
|
|
||||||
public static class TestTableProcSet {
|
public static class TestTableProcSet {
|
||||||
private final MasterProcedureScheduler queue;
|
private final MasterProcedureScheduler queue;
|
||||||
private Map<Long, TableProcedureInterface> procsMap =
|
|
||||||
new ConcurrentHashMap<Long, TableProcedureInterface>();
|
|
||||||
|
|
||||||
public TestTableProcSet(final MasterProcedureScheduler queue) {
|
public TestTableProcSet(final MasterProcedureScheduler queue) {
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addBack(TableProcedureInterface tableProc) {
|
public void addBack(Procedure proc) {
|
||||||
Procedure proc = (Procedure)tableProc;
|
|
||||||
procsMap.put(proc.getProcId(), tableProc);
|
|
||||||
queue.addBack(proc);
|
queue.addBack(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addFront(TableProcedureInterface tableProc) {
|
public void addFront(Procedure proc) {
|
||||||
Procedure proc = (Procedure)tableProc;
|
|
||||||
procsMap.put(proc.getProcId(), tableProc);
|
|
||||||
queue.addFront(proc);
|
queue.addFront(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TableProcedureInterface acquire() {
|
public Procedure acquire() {
|
||||||
TableProcedureInterface proc = null;
|
Procedure proc = null;
|
||||||
boolean avail = false;
|
boolean avail = false;
|
||||||
while (!avail) {
|
while (!avail) {
|
||||||
Procedure xProc = queue.poll();
|
proc = queue.poll();
|
||||||
proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null;
|
|
||||||
if (proc == null) break;
|
if (proc == null) break;
|
||||||
switch (proc.getTableOperationType()) {
|
switch (getTableOperationType(proc)) {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
case DELETE:
|
case DELETE:
|
||||||
case EDIT:
|
case EDIT:
|
||||||
avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(),
|
avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
|
||||||
"op="+ proc.getTableOperationType());
|
|
||||||
break;
|
break;
|
||||||
case READ:
|
case READ:
|
||||||
avail = queue.tryAcquireTableSharedLock(proc.getTableName(),
|
avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
|
||||||
"op="+ proc.getTableOperationType());
|
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
if (!avail) {
|
if (!avail) {
|
||||||
addFront(proc);
|
addFront(proc);
|
||||||
|
@ -479,18 +480,26 @@ public class TestMasterProcedureScheduler {
|
||||||
return proc;
|
return proc;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void release(TableProcedureInterface proc) {
|
public void release(Procedure proc) {
|
||||||
switch (proc.getTableOperationType()) {
|
switch (getTableOperationType(proc)) {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
case DELETE:
|
case DELETE:
|
||||||
case EDIT:
|
case EDIT:
|
||||||
queue.releaseTableExclusiveLock(proc.getTableName());
|
queue.releaseTableExclusiveLock(proc, getTableName(proc));
|
||||||
break;
|
break;
|
||||||
case READ:
|
case READ:
|
||||||
queue.releaseTableSharedLock(proc.getTableName());
|
queue.releaseTableSharedLock(proc, getTableName(proc));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TableName getTableName(Procedure proc) {
|
||||||
|
return ((TableProcedureInterface)proc).getTableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
|
||||||
|
return ((TableProcedureInterface)proc).getTableOperationType();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestTableProcedure extends TestProcedure
|
public static class TestTableProcedure extends TestProcedure
|
||||||
|
|
Loading…
Reference in New Issue