diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java index 4a15857df3a..0cd4103fa02 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java @@ -222,10 +222,10 @@ public class ProcedureInfo { procProto.getOwner(), procProto.getState(), procProto.hasParentId() ? procProto.getParentId() : -1, - procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null, + procProto.hasException() ? procProto.getException() : null, procProto.getLastUpdate(), procProto.getStartTime(), - procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null); + procProto.hasResult() ? procProto.getResult().toByteArray() : null); } /** diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 67ab1191b95..95990e89974 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -785,8 +785,7 @@ public class ProcedureExecutor { */ private void execLoop() { while (isRunning()) { - Long procId = runnables.poll(); - Procedure proc = procId != null ? procedures.get(procId) : null; + Procedure proc = runnables.poll(); if (proc == null) continue; try { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java deleted file mode 100644 index 242ae868e7d..00000000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.util.Map; - -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ConcurrentSkipListMap; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * This class is a container of queues that allows to select a queue - * in a round robin fashion, considering priority of the queue. - * - * the quantum is just how many poll() will return the same object. - * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B - * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B - * then the object priority is just a priority * quantum - * - * Example: - * - three queues (A, B, C) with priorities (1, 1, 2) - * - The first poll() will return A - * - The second poll() will return B - * - The third and forth poll() will return C - * - and so on again and again. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureFairRunQueues { - private ConcurrentSkipListMap objMap = - new ConcurrentSkipListMap(); - - private final ReentrantLock lock = new ReentrantLock(); - private final int quantum; - - private Map.Entry current = null; - private int currentQuantum = 0; - - public interface FairObject { - boolean isAvailable(); - int getPriority(); - } - - /** - * @param quantum how many poll() will return the same object. - */ - public ProcedureFairRunQueues(final int quantum) { - this.quantum = quantum; - } - - public TQueue get(final TKey key) { - return objMap.get(key); - } - - public TQueue add(final TKey key, final TQueue queue) { - TQueue oldq = objMap.putIfAbsent(key, queue); - return oldq != null ? oldq : queue; - } - - public TQueue remove(final TKey key) { - TQueue queue = objMap.get(key); - if (queue != null) { - lock.lock(); - try { - queue = objMap.remove(key); - if (current != null && queue == current.getValue()) { - currentQuantum = 0; - current = null; - } - } finally { - lock.unlock(); - } - } - return queue; - } - - public void clear() { - lock.lock(); - try { - currentQuantum = 0; - current = null; - objMap.clear(); - } finally { - lock.unlock(); - } - } - - /** - * @return the next available item if present - */ - public TQueue poll() { - lock.lock(); - try { - TQueue queue; - if (currentQuantum == 0) { - if (nextObject() == null) { - // nothing here - return null; - } - - queue = current.getValue(); - currentQuantum = calculateQuantum(queue) - 1; - } else { - currentQuantum--; - queue = current.getValue(); - } - - if (!queue.isAvailable()) { - Map.Entry last = current; - // Try the next one - do { - if (nextObject() == null) - return null; - } while (current.getValue() != last.getValue() && !current.getValue().isAvailable()); - - queue = current.getValue(); - currentQuantum = calculateQuantum(queue) - 1; - } - - return queue; - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append('{'); - for (Map.Entry entry: objMap.entrySet()) { - builder.append(entry.getKey()); - builder.append(':'); - builder.append(entry.getValue()); - } - builder.append('}'); - return builder.toString(); - } - - private Map.Entry nextObject() { - Map.Entry next = null; - - // If we have already a key, try the next one - if (current != null) { - next = objMap.higherEntry(current.getKey()); - } - - // if there is no higher key, go back to the first - current = (next != null) ? next : objMap.firstEntry(); - return current; - } - - private int calculateQuantum(final TQueue fairObject) { - // TODO - return Math.max(1, fairObject.getPriority() * quantum); - } -} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java index 2d7ba39be36..65df692f3e2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java @@ -55,9 +55,9 @@ public interface ProcedureRunnableSet { /** * Fetch one Procedure from the queue - * @return the Procedure ID to execute, or null if nothing present. + * @return the Procedure to execute, or null if nothing present. */ - Long poll(); + Procedure poll(); /** * In case the class is blocking on poll() waiting for items to be added, diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java index 7b17fb26f18..d23680dd367 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { - private final Deque runnables = new ArrayDeque(); + private final Deque runnables = new ArrayDeque(); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); @@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public void addFront(final Procedure proc) { lock.lock(); try { - runnables.addFirst(proc.getProcId()); + runnables.addFirst(proc); waitCond.signal(); } finally { lock.unlock(); @@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public void addBack(final Procedure proc) { lock.lock(); try { - runnables.addLast(proc.getProcId()); + runnables.addLast(proc); waitCond.signal(); } finally { lock.unlock(); @@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Long poll() { + public Procedure poll() { lock.lock(); try { if (runnables.isEmpty()) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java deleted file mode 100644 index 4a36665c724..00000000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import org.apache.hadoop.hbase.testclassification.SmallTests; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; - -@Category(SmallTests.class) -public class TestProcedureFairRunQueues { - private static class TestRunQueue implements ProcedureFairRunQueues.FairObject { - private final int priority; - private final String name; - - private boolean available = true; - - public TestRunQueue(String name, int priority) { - this.name = name; - this.priority = priority; - } - - @Override - public String toString() { - return name; - } - - private void setAvailable(boolean available) { - this.available = available; - } - - @Override - public boolean isAvailable() { - return available; - } - - @Override - public int getPriority() { - return priority; - } - } - - @Test - public void testEmptyFairQueues() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - for (int i = 0; i < 3; ++i) { - assertEquals(null, fairq.poll()); - } - } - - @Test - public void testFairQueues() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - for (int i = 0; i < 3; ++i) { - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - } - } - - @Test - public void testFairQueuesNotAvailable() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - // m is not available - m.setAvailable(false); - for (int i = 0; i < 3; ++i) { - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - } - - // m is available - m.setAvailable(true); - for (int i = 0; i < 3; ++i) { - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - } - - // b is not available - b.setAvailable(false); - for (int i = 0; i < 3; ++i) { - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(a, fairq.poll()); - } - - assertEquals(m, fairq.poll()); - m.setAvailable(false); - // m should be fetched next, but is no longer available - assertEquals(a, fairq.poll()); - assertEquals(a, fairq.poll()); - b.setAvailable(true); - for (int i = 0; i < 3; ++i) { - assertEquals(b, fairq.poll()); - assertEquals(a, fairq.poll()); - } - } - - @Test - public void testFairQueuesDelete() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - // Fetch A and then remove it - assertEquals(a, fairq.poll()); - assertEquals(a, fairq.remove("A")); - - // Fetch B and then remove it - assertEquals(b, fairq.poll()); - assertEquals(b, fairq.remove("B")); - - // Fetch M and then remove it - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.remove("M")); - - // nothing left - assertEquals(null, fairq.poll()); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2999789a3d5..1fcc75117cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; @@ -280,14 +281,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // flag set after we complete initialization once active, // it is not private since it's used in unit tests - volatile boolean initialized = false; + private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); // flag set after master services are started, // initialization may have not completed yet. volatile boolean serviceStarted = false; // flag set after we complete assignMeta. - private volatile boolean serverCrashProcessingEnabled = false; + private final ProcedureEvent serverCrashProcessingEnabled = + new ProcedureEvent("server crash processing"); LoadBalancer balancer; private RegionNormalizer normalizer; @@ -783,7 +785,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { configurationManager.registerObserver(this.balancer); // Set master as 'initialized'. - initialized = true; + setInitialized(true); status.setStatus("Starting quota manager"); initQuotaManager(); @@ -1002,8 +1004,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // servers. This is required so that if meta is assigning to a server which dies after // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be // stuck here waiting forever if waitForMeta is specified. - if (!serverCrashProcessingEnabled) { - serverCrashProcessingEnabled = true; + if (!isServerCrashProcessingEnabled()) { + setServerCrashProcessingEnabled(true); this.serverManager.processQueuedDeadServers(); } @@ -1240,7 +1242,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { public boolean balance(boolean force) throws IOException { // if master not initialized, don't run balancer. - if (!this.initialized) { + if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run balancer."); return false; } @@ -1337,7 +1339,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { * @throws CoordinatedStateException */ public boolean normalizeRegions() throws IOException, CoordinatedStateException { - if (!this.initialized) { + if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run region normalizer."); return false; } @@ -1648,7 +1650,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } } - private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) + private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) throws IOException { // FIFO compaction has some requirements // Actually FCP ignores periodic major compactions @@ -1705,7 +1707,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } } } - + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { @@ -2301,7 +2303,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException { checkServiceStarted(); - if (!this.initialized) { + if (!isInitialized()) { throw new PleaseHoldException("Master is initializing"); } } @@ -2336,6 +2338,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server { */ @Override public boolean isInitialized() { + return initialized.isReady(); + } + + @VisibleForTesting + public void setInitialized(boolean isInitialized) { + procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized); + } + + public ProcedureEvent getInitializedEvent() { return initialized; } @@ -2346,12 +2357,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server { */ @Override public boolean isServerCrashProcessingEnabled() { - return this.serverCrashProcessingEnabled; + return serverCrashProcessingEnabled.isReady(); } @VisibleForTesting public void setServerCrashProcessingEnabled(final boolean b) { - this.serverCrashProcessingEnabled = b; + procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); + } + + public ProcedureEvent getServerCrashProcessingEnabledEvent() { + return serverCrashProcessingEnabled; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java index b6642a07a5d..3a98b0c4bdc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_ADD_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family"); } @Override @@ -405,4 +402,4 @@ public class AddColumnFamilyProcedure } return regionInfoList; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java index 657bbfb34bf..55fe5c8dafc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java @@ -205,7 +205,9 @@ public class CreateNamespaceProcedure return true; } - return false; + if (env.waitInitialized(this)) { + return false; + } } return getTableNamespaceManager(env).acquireExclusiveLock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index 8bcd3de821b..ad069bc3cb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -270,7 +270,7 @@ public class CreateTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized() && !getTableName().isSystemTable()) { + if (!getTableName().isSystemTable() && env.waitInitialized(this)) { return false; } return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index c15ab98ec8e..17cf5b6870c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -201,10 +200,8 @@ public class DeleteColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_DELETE_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family"); } @Override @@ -442,4 +439,4 @@ public class DeleteColumnFamilyProcedure } return regionInfoList; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 46345a52bc3..71c6c2d4b1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -198,7 +198,7 @@ public class DeleteTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (env.waitInitialized(this)) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index dbfa694f7c3..8e80a1978bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; @@ -215,10 +214,8 @@ public class DisableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_DISABLE_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index 7201dc79aeb..e54d6f88001 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.exceptions.HBaseException; +import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.GeneralBulkAssigner; @@ -239,10 +239,8 @@ public class EnableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_ENABLE_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 6700b63a8b6..090b8ccc5da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.User; @@ -85,12 +87,12 @@ public class MasterProcedureEnv { } } - private final MasterProcedureQueue procQueue; + private final MasterProcedureScheduler procSched; private final MasterServices master; public MasterProcedureEnv(final MasterServices master) { this.master = master; - this.procQueue = new MasterProcedureQueue(master.getConfiguration(), + this.procSched = new MasterProcedureScheduler(master.getConfiguration(), master.getTableLockManager()); } @@ -114,8 +116,8 @@ public class MasterProcedureEnv { return master.getMasterCoprocessorHost(); } - public MasterProcedureQueue getProcedureQueue() { - return procQueue; + public MasterProcedureScheduler getProcedureQueue() { + return procSched; } public boolean isRunning() { @@ -125,4 +127,28 @@ public class MasterProcedureEnv { public boolean isInitialized() { return master.isInitialized(); } + + public boolean waitInitialized(Procedure proc) { + return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc); + } + + public boolean waitServerCrashProcessingEnabled(Procedure proc) { + return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); + } + + public void wake(ProcedureEvent event) { + procSched.wake(event); + } + + public void suspend(ProcedureEvent event) { + procSched.suspend(event); + } + + public void setEventReady(ProcedureEvent event, boolean isReady) { + if (isReady) { + procSched.wake(event); + } else { + procSched.suspend(event); + } + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java deleted file mode 100644 index c4c774743b0..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java +++ /dev/null @@ -1,578 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues; -import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; - -/** - * ProcedureRunnableSet for the Master Procedures. - * This RunnableSet tries to provide to the ProcedureExecutor procedures - * that can be executed without having to wait on a lock. - * Most of the master operations can be executed concurrently, if they - * are operating on different tables (e.g. two create table can be performed - * at the same, time assuming table A and table B) or against two different servers; say - * two servers that crashed at about the same time. - * - *

Each procedure should implement an interface providing information for this queue. - * for example table related procedures should implement TableProcedureInterface. - * each procedure will be pushed in its own queue, and based on the operation type - * we may take smarter decision. e.g. we can abort all the operations preceding - * a delete table, or similar. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MasterProcedureQueue implements ProcedureRunnableSet { - private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class); - - // Two queues to ensure that server procedures run ahead of table precedures always. - private final ProcedureFairRunQueues tableFairQ; - /** - * Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the - * server that was carrying meta should rise to the top of the queue (this is how it used to - * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers - * that were carrying system tables on crash; do I need to have these servers have priority? - * - *

Apart from the special-casing of meta and system tables, fairq is what we want - */ - private final ProcedureFairRunQueues serverFairQ; - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - private final TableLockManager lockManager; - - private final int metaTablePriority; - private final int userTablePriority; - private final int sysTablePriority; - private static final int DEFAULT_SERVER_PRIORITY = 1; - - /** - * Keeps count across server and table queues. - */ - private int queueSize; - - public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) { - this.tableFairQ = new ProcedureFairRunQueues(1); - this.serverFairQ = new ProcedureFairRunQueues(1); - this.lockManager = lockManager; - - // TODO: should this be part of the HTD? - metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); - sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); - userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); - } - - @Override - public void addFront(final Procedure proc) { - lock.lock(); - try { - getRunQueueOrCreate(proc).addFront(proc); - queueSize++; - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void addBack(final Procedure proc) { - lock.lock(); - try { - getRunQueueOrCreate(proc).addBack(proc); - queueSize++; - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void yield(final Procedure proc) { - addBack(proc); - } - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Long poll() { - Long pollResult = null; - lock.lock(); - try { - if (queueSize == 0) { - waitCond.await(); - if (queueSize == 0) { - return null; - } - } - // For now, let server handling have precedence over table handling; presumption is that it - // is more important handling crashed servers than it is running the - // enabling/disabling tables, etc. - pollResult = doPoll(serverFairQ.poll()); - if (pollResult == null) { - pollResult = doPoll(tableFairQ.poll()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - lock.unlock(); - } - return pollResult; - } - - private Long doPoll(final RunQueue rq) { - if (rq == null || !rq.isAvailable()) return null; - this.queueSize--; - return rq.poll(); - } - - @Override - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void clear() { - lock.lock(); - try { - serverFairQ.clear(); - tableFairQ.clear(); - queueSize = 0; - } finally { - lock.unlock(); - } - } - - @Override - public int size() { - lock.lock(); - try { - return queueSize; - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - lock.lock(); - try { - return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ + - ", serverFairQ: " + serverFairQ; - } finally { - lock.unlock(); - } - } - - @Override - public void completionCleanup(Procedure proc) { - if (proc instanceof TableProcedureInterface) { - TableProcedureInterface iProcTable = (TableProcedureInterface)proc; - boolean tableDeleted; - if (proc.hasException()) { - IOException procEx = proc.getException().unwrapRemoteException(); - if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { - // create failed because the table already exist - tableDeleted = !(procEx instanceof TableExistsException); - } else { - // the operation failed because the table does not exist - tableDeleted = (procEx instanceof TableNotFoundException); - } - } else { - // the table was deleted - tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); - } - if (tableDeleted) { - markTableAsDeleted(iProcTable.getTableName()); - } - } - // No cleanup for ServerProcedureInterface types, yet. - } - - private RunQueue getRunQueueOrCreate(final Procedure proc) { - if (proc instanceof TableProcedureInterface) { - final TableName table = ((TableProcedureInterface)proc).getTableName(); - return getRunQueueOrCreate(table); - } - if (proc instanceof ServerProcedureInterface) { - return getRunQueueOrCreate((ServerProcedureInterface)proc); - } - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet"); - } - - private TableRunQueue getRunQueueOrCreate(final TableName table) { - final TableRunQueue queue = getRunQueue(table); - if (queue != null) return queue; - return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table)); - } - - private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) { - final ServerRunQueue queue = getRunQueue(spi.getServerName()); - if (queue != null) return queue; - return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi)); - } - - private TableRunQueue createTableRunQueue(final TableName table) { - int priority = userTablePriority; - if (table.equals(TableName.META_TABLE_NAME)) { - priority = metaTablePriority; - } else if (table.isSystemTable()) { - priority = sysTablePriority; - } - return new TableRunQueue(priority); - } - - private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) { - return new ServerRunQueue(DEFAULT_SERVER_PRIORITY); - } - - private TableRunQueue getRunQueue(final TableName table) { - return (TableRunQueue)tableFairQ.get(table); - } - - private ServerRunQueue getRunQueue(final ServerName sn) { - return (ServerRunQueue)serverFairQ.get(sn); - } - - /** - * Try to acquire the write lock on the specified table. - * other operations in the table-queue will be executed after the lock is released. - * @param table Table to lock - * @param purpose Human readable reason for locking the table - * @return true if we were able to acquire the lock on the table, otherwise false. - */ - public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) { - return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose); - } - - /** - * Release the write lock taken with tryAcquireTableWrite() - * @param table the name of the table that has the write lock - */ - public void releaseTableExclusiveLock(final TableName table) { - getRunQueue(table).releaseExclusiveLock(lockManager, table); - } - - /** - * Try to acquire the read lock on the specified table. - * other read operations in the table-queue may be executed concurrently, - * otherwise they have to wait until all the read-locks are released. - * @param table Table to lock - * @param purpose Human readable reason for locking the table - * @return true if we were able to acquire the lock on the table, otherwise false. - */ - public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) { - return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose); - } - - /** - * Release the read lock taken with tryAcquireTableRead() - * @param table the name of the table that has the read lock - */ - public void releaseTableSharedLock(final TableName table) { - getRunQueue(table).releaseSharedLock(lockManager, table); - } - - /** - * Try to acquire the write lock on the specified server. - * @see #releaseServerExclusiveLock(ServerProcedureInterface) - * @param spi Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) { - return getRunQueueOrCreate(spi).tryExclusiveLock(); - } - - /** - * Release the write lock - * @see #tryAcquireServerExclusiveLock(ServerProcedureInterface) - * @param spi the server that has the write lock - */ - public void releaseServerExclusiveLock(final ServerProcedureInterface spi) { - getRunQueue(spi.getServerName()).releaseExclusiveLock(); - } - - /** - * Try to acquire the read lock on the specified server. - * @see #releaseServerSharedLock(ServerProcedureInterface) - * @param spi Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) { - return getRunQueueOrCreate(spi).trySharedLock(); - } - - /** - * Release the read lock taken - * @see #tryAcquireServerSharedLock(ServerProcedureInterface) - * @param spi the server that has the read lock - */ - public void releaseServerSharedLock(final ServerProcedureInterface spi) { - getRunQueue(spi.getServerName()).releaseSharedLock(); - } - - /** - * Tries to remove the queue and the table-lock of the specified table. - * If there are new operations pending (e.g. a new create), - * the remove will not be performed. - * @param table the name of the table that should be marked as deleted - * @return true if deletion succeeded, false otherwise meaning that there are - * other new operations pending for that table (e.g. a new create). - */ - protected boolean markTableAsDeleted(final TableName table) { - TableRunQueue queue = getRunQueue(table); - if (queue != null) { - lock.lock(); - try { - if (queue.isEmpty() && queue.acquireDeleteLock()) { - tableFairQ.remove(table); - - // Remove the table lock - try { - lockManager.tableDeleted(table); - } catch (IOException e) { - LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical - } - } else { - // TODO: If there are no create, we can drop all the other ops - return false; - } - } finally { - lock.unlock(); - } - } - return true; - } - - private interface RunQueue extends ProcedureFairRunQueues.FairObject { - void addFront(Procedure proc); - void addBack(Procedure proc); - Long poll(); - boolean acquireDeleteLock(); - } - - /** - * Base abstract class for RunQueue implementations. - * Be careful honoring synchronizations in subclasses. In here we protect access but if you are - * acting on a state found in here, be sure dependent code keeps synchronization. - * Implements basic in-memory read/write locking mechanism to prevent procedure steps being run - * in parallel. - */ - private static abstract class AbstractRunQueue implements RunQueue { - // All modification of runnables happens with #lock held. - private final Deque runnables = new ArrayDeque(); - private final int priority; - private boolean exclusiveLock = false; - private int sharedLock = 0; - - public AbstractRunQueue(int priority) { - this.priority = priority; - } - - boolean isEmpty() { - return this.runnables.isEmpty(); - } - - @Override - public boolean isAvailable() { - synchronized (this) { - return !exclusiveLock && !runnables.isEmpty(); - } - } - - @Override - public int getPriority() { - return this.priority; - } - - @Override - public void addFront(Procedure proc) { - this.runnables.addFirst(proc.getProcId()); - } - - @Override - public void addBack(Procedure proc) { - this.runnables.addLast(proc.getProcId()); - } - - @Override - public Long poll() { - return this.runnables.poll(); - } - - @Override - public synchronized boolean acquireDeleteLock() { - return tryExclusiveLock(); - } - - public synchronized boolean isLocked() { - return isExclusiveLock() || sharedLock > 0; - } - - public synchronized boolean isExclusiveLock() { - return this.exclusiveLock; - } - - public synchronized boolean trySharedLock() { - if (isExclusiveLock()) return false; - sharedLock++; - return true; - } - - public synchronized void releaseSharedLock() { - sharedLock--; - } - - /** - * @return True if only one instance of a shared lock outstanding. - */ - synchronized boolean isSingleSharedLock() { - return sharedLock == 1; - } - - public synchronized boolean tryExclusiveLock() { - if (isLocked()) return false; - exclusiveLock = true; - return true; - } - - public synchronized void releaseExclusiveLock() { - exclusiveLock = false; - } - - @Override - public String toString() { - return this.runnables.toString(); - } - } - - /** - * Run Queue for Server procedures. - */ - private static class ServerRunQueue extends AbstractRunQueue { - public ServerRunQueue(int priority) { - super(priority); - } - } - - /** - * Run Queue for a Table. It contains a read-write lock that is used by the - * MasterProcedureQueue to decide if we should fetch an item from this queue - * or skip to another one which will be able to run without waiting for locks. - */ - private static class TableRunQueue extends AbstractRunQueue { - private TableLock tableLock = null; - - public TableRunQueue(int priority) { - super(priority); - } - - // TODO: Improve run-queue push with TableProcedureInterface.getType() - // we can take smart decisions based on the type of the operation (e.g. create/delete) - @Override - public void addBack(final Procedure proc) { - super.addBack(proc); - } - - public synchronized boolean trySharedLock(final TableLockManager lockManager, - final TableName tableName, final String purpose) { - if (isExclusiveLock()) return false; - - // Take zk-read-lock - tableLock = lockManager.readLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire read lock on " + tableName, e); - tableLock = null; - return false; - } - trySharedLock(); - return true; - } - - public synchronized void releaseSharedLock(final TableLockManager lockManager, - final TableName tableName) { - releaseTableLock(lockManager, isSingleSharedLock()); - releaseSharedLock(); - } - - public synchronized boolean tryExclusiveLock(final TableLockManager lockManager, - final TableName tableName, final String purpose) { - if (isLocked()) return false; - // Take zk-write-lock - tableLock = lockManager.writeLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire write lock on " + tableName, e); - tableLock = null; - return false; - } - tryExclusiveLock(); - return true; - } - - public synchronized void releaseExclusiveLock(final TableLockManager lockManager, - final TableName tableName) { - releaseTableLock(lockManager, true); - releaseExclusiveLock(); - } - - private void releaseTableLock(final TableLockManager lockManager, boolean reset) { - for (int i = 0; i < 3; ++i) { - try { - tableLock.release(); - if (reset) { - tableLock = null; - } - break; - } catch (IOException e) { - LOG.warn("Could not release the table write-lock", e); - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java new file mode 100644 index 00000000000..9a3714f7323 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -0,0 +1,1241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; + +/** + * ProcedureRunnableSet for the Master Procedures. + * This RunnableSet tries to provide to the ProcedureExecutor procedures + * that can be executed without having to wait on a lock. + * Most of the master operations can be executed concurrently, if they + * are operating on different tables (e.g. two create table can be performed + * at the same, time assuming table A and table B) or against two different servers; say + * two servers that crashed at about the same time. + * + *

Each procedure should implement an interface providing information for this queue. + * for example table related procedures should implement TableProcedureInterface. + * each procedure will be pushed in its own queue, and based on the operation type + * we may take smarter decision. e.g. we can abort all the operations preceding + * a delete table, or similar. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterProcedureScheduler implements ProcedureRunnableSet { + private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); + + private final TableLockManager lockManager; + private final ReentrantLock schedLock = new ReentrantLock(); + private final Condition schedWaitCond = schedLock.newCondition(); + + private final FairQueue serverRunQueue = new FairQueue(); + private final FairQueue tableRunQueue = new FairQueue(); + private int queueSize = 0; + + private final Object[] serverBuckets = new Object[128]; + private Queue namespaceMap = null; + private Queue tableMap = null; + + private final int metaTablePriority; + private final int userTablePriority; + private final int sysTablePriority; + + // TODO: metrics + private long pollCalls = 0; + private long nullPollCalls = 0; + + public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { + this.lockManager = lockManager; + + // TODO: should this be part of the HTD? + metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); + sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); + userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + } + + @Override + public void addFront(Procedure proc) { + doAdd(proc, true); + } + + @Override + public void addBack(Procedure proc) { + doAdd(proc, false); + } + + @Override + public void yield(final Procedure proc) { + doAdd(proc, isTableProcedure(proc)); + } + + private void doAdd(final Procedure proc, final boolean addFront) { + schedLock.lock(); + try { + if (isTableProcedure(proc)) { + doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); + } else if (isServerProcedure(proc)) { + doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet"); + } + schedWaitCond.signal(); + } finally { + schedLock.unlock(); + } + } + + private > void doAdd(final FairQueue fairq, + final Queue queue, final Procedure proc, final boolean addFront) { + queue.add(proc, addFront); + if (!(queue.isSuspended() || queue.hasExclusiveLock())) { + if (queue.size() == 1 && !IterableList.isLinked(queue)) { + fairq.add(queue); + } + queueSize++; + } + } + + @Override + public Procedure poll() { + return poll(-1); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + Procedure poll(long waitNsec) { + Procedure pollResult = null; + schedLock.lock(); + try { + if (queueSize == 0) { + if (waitNsec < 0) { + schedWaitCond.await(); + } else { + schedWaitCond.awaitNanos(waitNsec); + } + if (queueSize == 0) { + return null; + } + } + + // For now, let server handling have precedence over table handling; presumption is that it + // is more important handling crashed servers than it is running the + // enabling/disabling tables, etc. + pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(tableRunQueue); + } + + // update metrics + pollCalls++; + nullPollCalls += (pollResult == null) ? 1 : 0; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + schedLock.unlock(); + } + return pollResult; + } + + private > Procedure doPoll(final FairQueue fairq) { + Queue rq = fairq.poll(); + if (rq == null || !rq.isAvailable()) { + return null; + } + + assert !rq.isSuspended() : "rq=" + rq + " is suspended"; + Procedure pollResult = rq.poll(); + this.queueSize--; + if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) { + removeFromRunQueue(fairq, rq); + } + return pollResult; + } + + @Override + public void clear() { + // NOTE: USED ONLY FOR TESTING + schedLock.lock(); + try { + // Remove Servers + for (int i = 0; i < serverBuckets.length; ++i) { + clear((ServerQueue)serverBuckets[i], serverRunQueue); + serverBuckets[i] = null; + } + + // Remove Tables + clear(tableMap, tableRunQueue); + tableMap = null; + + assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; + } finally { + schedLock.unlock(); + } + } + + private > void clear(Queue treeMap, FairQueue fairq) { + while (treeMap != null) { + Queue node = AvlTree.getFirst(treeMap); + assert !node.isSuspended() : "can't clear suspended " + node.getKey(); + treeMap = AvlTree.remove(treeMap, node.getKey()); + removeFromRunQueue(fairq, node); + } + } + + @Override + public void signalAll() { + schedLock.lock(); + try { + schedWaitCond.signalAll(); + } finally { + schedLock.unlock(); + } + } + + @Override + public int size() { + schedLock.lock(); + try { + return queueSize; + } finally { + schedLock.unlock(); + } + } + + @Override + public void completionCleanup(Procedure proc) { + if (proc instanceof TableProcedureInterface) { + TableProcedureInterface iProcTable = (TableProcedureInterface)proc; + boolean tableDeleted; + if (proc.hasException()) { + IOException procEx = proc.getException().unwrapRemoteException(); + if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { + // create failed because the table already exist + tableDeleted = !(procEx instanceof TableExistsException); + } else { + // the operation failed because the table does not exist + tableDeleted = (procEx instanceof TableNotFoundException); + } + } else { + // the table was deleted + tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); + } + if (tableDeleted) { + markTableAsDeleted(iProcTable.getTableName()); + return; + } + } else { + // No cleanup for ServerProcedureInterface types, yet. + return; + } + } + + private > void addToRunQueue(FairQueue fairq, Queue queue) { + if (IterableList.isLinked(queue)) return; + if (!queue.isEmpty()) { + fairq.add(queue); + queueSize += queue.size(); + } + } + + private > void removeFromRunQueue(FairQueue fairq, Queue queue) { + if (!IterableList.isLinked(queue)) return; + fairq.remove(queue); + queueSize -= queue.size(); + } + + // ============================================================================ + // TODO: Metrics + // ============================================================================ + public long getPollCalls() { + return pollCalls; + } + + public long getNullPollCalls() { + return nullPollCalls; + } + + // ============================================================================ + // Event Helpers + // ============================================================================ + public boolean waitEvent(ProcedureEvent event, Procedure procedure) { + return waitEvent(event, procedure, false); + } + + public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + synchronized (event) { + if (event.isReady()) { + return false; + } + + // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue + if (!suspendQueue) suspendQueue = true; + + if (isTableProcedure(procedure)) { + suspendTableQueue(event, getTableName(procedure)); + } else if (isServerProcedure(procedure)) { + suspendServerQueue(event, getServerName(procedure)); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet"); + } + } + return true; + } + + private void suspendTableQueue(ProcedureEvent event, TableName tableName) { + schedLock.lock(); + try { + TableQueue queue = getTableQueue(tableName); + if (!queue.setSuspended(true)) return; + + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend table queue " + tableName); + } + removeFromRunQueue(tableRunQueue, queue); + event.suspendTableQueue(queue); + } finally { + schedLock.unlock(); + } + } + + private void suspendServerQueue(ProcedureEvent event, ServerName serverName) { + schedLock.lock(); + try { + // TODO: This will change once we have the new AM + ServerQueue queue = getServerQueue(serverName); + if (!queue.setSuspended(true)) return; + + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend server queue " + serverName); + } + removeFromRunQueue(serverRunQueue, queue); + event.suspendServerQueue(queue); + } finally { + schedLock.unlock(); + } + } + + public void suspend(ProcedureEvent event) { + synchronized (event) { + event.setReady(false); + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend event " + event); + } + } + } + + public void wake(ProcedureEvent event) { + synchronized (event) { + event.setReady(true); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake event " + event); + } + + schedLock.lock(); + try { + while (event.hasWaitingTables()) { + Queue queue = event.popWaitingTable(); + addToRunQueue(tableRunQueue, queue); + } + // TODO: This will change once we have the new AM + while (event.hasWaitingServers()) { + Queue queue = event.popWaitingServer(); + addToRunQueue(serverRunQueue, queue); + } + + if (queueSize > 1) { + schedWaitCond.signalAll(); + } else if (queueSize > 0) { + schedWaitCond.signal(); + } + } finally { + schedLock.unlock(); + } + } + } + + public static class ProcedureEvent { + private final String description; + + private Queue waitingServers = null; + private Queue waitingTables = null; + private boolean ready = false; + + public ProcedureEvent(String description) { + this.description = description; + } + + public synchronized boolean isReady() { + return ready; + } + + private synchronized void setReady(boolean isReady) { + this.ready = isReady; + } + + private void suspendTableQueue(Queue queue) { + waitingTables = IterableList.append(waitingTables, queue); + } + + private void suspendServerQueue(Queue queue) { + waitingServers = IterableList.append(waitingServers, queue); + } + + private boolean hasWaitingTables() { + return waitingTables != null; + } + + private Queue popWaitingTable() { + Queue node = waitingTables; + waitingTables = IterableList.remove(waitingTables, node); + node.setSuspended(false); + return node; + } + + private boolean hasWaitingServers() { + return waitingServers != null; + } + + private Queue popWaitingServer() { + Queue node = waitingServers; + waitingServers = IterableList.remove(waitingServers, node); + node.setSuspended(false); + return node; + } + + @Override + public String toString() { + return String.format("ProcedureEvent(%s)", description); + } + } + + // ============================================================================ + // Table Queue Lookup Helpers + // ============================================================================ + private TableQueue getTableQueueWithLock(TableName tableName) { + schedLock.lock(); + try { + return getTableQueue(tableName); + } finally { + schedLock.unlock(); + } + } + + private TableQueue getTableQueue(TableName tableName) { + Queue node = AvlTree.get(tableMap, tableName); + if (node != null) return (TableQueue)node; + + node = new TableQueue(tableName, getTablePriority(tableName)); + tableMap = AvlTree.insert(tableMap, node); + return (TableQueue)node; + } + + private void removeTableQueue(TableName tableName) { + tableMap = AvlTree.remove(tableMap, tableName); + } + + private int getTablePriority(TableName tableName) { + if (tableName.equals(TableName.META_TABLE_NAME)) { + return metaTablePriority; + } else if (tableName.isSystemTable()) { + return sysTablePriority; + } + return userTablePriority; + } + + private static boolean isTableProcedure(Procedure proc) { + return proc instanceof TableProcedureInterface; + } + + private static TableName getTableName(Procedure proc) { + return ((TableProcedureInterface)proc).getTableName(); + } + + // ============================================================================ + // Server Queue Lookup Helpers + // ============================================================================ + private ServerQueue getServerQueueWithLock(ServerName serverName) { + schedLock.lock(); + try { + return getServerQueue(serverName); + } finally { + schedLock.unlock(); + } + } + + private ServerQueue getServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + Queue root = getTreeRoot(serverBuckets, index); + Queue node = AvlTree.get(root, serverName); + if (node != null) return (ServerQueue)node; + + node = new ServerQueue(serverName); + serverBuckets[index] = AvlTree.insert(root, node); + return (ServerQueue)node; + } + + private void removeServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName); + } + + @SuppressWarnings("unchecked") + private static > Queue getTreeRoot(Object[] buckets, int index) { + return (Queue) buckets[index]; + } + + private static int getBucketIndex(Object[] buckets, int hashCode) { + return Math.abs(hashCode) % buckets.length; + } + + private static boolean isServerProcedure(Procedure proc) { + return proc instanceof ServerProcedureInterface; + } + + private static ServerName getServerName(Procedure proc) { + return ((ServerProcedureInterface)proc).getServerName(); + } + + // ============================================================================ + // Table and Server Queue Implementation + // ============================================================================ + public static class ServerQueue extends QueueImpl { + public ServerQueue(ServerName serverName) { + super(serverName); + } + + public boolean requireExclusiveLock(Procedure proc) { + ServerProcedureInterface spi = (ServerProcedureInterface)proc; + switch (spi.getServerOperationType()) { + case CRASH_HANDLER: + return true; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType()); + } + } + + public static class TableQueue extends QueueImpl { + private TableLock tableLock = null; + + public TableQueue(TableName tableName, int priority) { + super(tableName, priority); + } + + // TODO: We can abort pending/in-progress operation if the new call is + // something like drop table. We can Override addBack(), + // check the type and abort all the in-flight procedurs. + private boolean canAbortPendingOperations(Procedure proc) { + TableProcedureInterface tpi = (TableProcedureInterface)proc; + switch (tpi.getTableOperationType()) { + case DELETE: + return true; + default: + return false; + } + } + + public boolean requireExclusiveLock(Procedure proc) { + TableProcedureInterface tpi = (TableProcedureInterface)proc; + switch (tpi.getTableOperationType()) { + case CREATE: + case DELETE: + case DISABLE: + case EDIT: + case ENABLE: + return true; + case READ: + return false; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType()); + } + + private synchronized boolean trySharedLock(final TableLockManager lockManager, + final String purpose) { + if (hasExclusiveLock()) return false; + + // Take zk-read-lock + TableName tableName = getKey(); + tableLock = lockManager.readLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire read lock on " + tableName, e); + tableLock = null; + return false; + } + + trySharedLock(); + return true; + } + + private synchronized void releaseSharedLock(final TableLockManager lockManager) { + releaseTableLock(lockManager, isSingleSharedLock()); + releaseSharedLock(); + } + + private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager, + final String purpose) { + // Take zk-write-lock + TableName tableName = getKey(); + tableLock = lockManager.writeLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire write lock on " + tableName, e); + tableLock = null; + return false; + } + return true; + } + + private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) { + releaseTableLock(lockManager, true); + } + + private void releaseTableLock(final TableLockManager lockManager, boolean reset) { + for (int i = 0; i < 3; ++i) { + try { + tableLock.release(); + if (reset) { + tableLock = null; + } + break; + } catch (IOException e) { + LOG.warn("Could not release the table write-lock", e); + } + } + } + } + + // ============================================================================ + // Locking Helpers + // ============================================================================ + /** + * Try to acquire the exclusive lock on the specified table. + * other operations in the table-queue will be executed after the lock is released. + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) { + schedLock.lock(); + TableQueue queue = getTableQueue(table); + boolean hasXLock = queue.tryExclusiveLock(); + if (!hasXLock) { + schedLock.unlock(); + return false; + } + + removeFromRunQueue(tableRunQueue, queue); + schedLock.unlock(); + + // Zk lock is expensive... + hasXLock = queue.tryZkExclusiveLock(lockManager, purpose); + if (!hasXLock) { + schedLock.lock(); + queue.releaseExclusiveLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } + return hasXLock; + } + + /** + * Release the exclusive lock taken with tryAcquireTableWrite() + * @param table the name of the table that has the exclusive lock + */ + public void releaseTableExclusiveLock(final TableName table) { + schedLock.lock(); + TableQueue queue = getTableQueue(table); + schedLock.unlock(); + + // Zk lock is expensive... + queue.releaseZkExclusiveLock(lockManager); + + schedLock.lock(); + queue.releaseExclusiveLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } + + /** + * Try to acquire the shared lock on the specified table. + * other "read" operations in the table-queue may be executed concurrently, + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) { + return getTableQueueWithLock(table).trySharedLock(lockManager, purpose); + } + + /** + * Release the shared lock taken with tryAcquireTableRead() + * @param table the name of the table that has the shared lock + */ + public void releaseTableSharedLock(final TableName table) { + getTableQueueWithLock(table).releaseSharedLock(lockManager); + } + + /** + * Tries to remove the queue and the table-lock of the specified table. + * If there are new operations pending (e.g. a new create), + * the remove will not be performed. + * @param table the name of the table that should be marked as deleted + * @return true if deletion succeeded, false otherwise meaning that there are + * other new operations pending for that table (e.g. a new create). + */ + protected boolean markTableAsDeleted(final TableName table) { + final ReentrantLock l = schedLock; + l.lock(); + try { + TableQueue queue = getTableQueue(table); + if (queue == null) return true; + + if (queue.isEmpty() && queue.acquireDeleteLock()) { + // remove the table from the run-queue and the map + if (IterableList.isLinked(queue)) { + tableRunQueue.remove(queue); + } + + // Remove the table lock + try { + lockManager.tableDeleted(table); + } catch (IOException e) { + LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical + } + + removeTableQueue(table); + } else { + // TODO: If there are no create, we can drop all the other ops + return false; + } + } finally { + l.unlock(); + } + return true; + } + + // ============================================================================ + // Server Locking Helpers + // ============================================================================ + /** + * Release the exclusive lock + * @see #tryAcquireServerExclusiveLock(ServerName) + * @param serverName the server that has the exclusive lock + */ + public boolean tryAcquireServerExclusiveLock(final ServerName serverName) { + schedLock.lock(); + try { + ServerQueue queue = getServerQueue(serverName); + if (queue.tryExclusiveLock()) { + removeFromRunQueue(serverRunQueue, queue); + return true; + } + } finally { + schedLock.unlock(); + } + return false; + } + + /** + * Release the exclusive lock + * @see #tryAcquireServerExclusiveLock(ServerName) + * @param serverName the server that has the exclusive lock + */ + public void releaseServerExclusiveLock(final ServerName serverName) { + schedLock.lock(); + try { + ServerQueue queue = getServerQueue(serverName); + queue.releaseExclusiveLock(); + addToRunQueue(serverRunQueue, queue); + } finally { + schedLock.unlock(); + } + } + + /** + * Try to acquire the shared lock on the specified server. + * @see #releaseServerSharedLock(ServerName) + * @param serverName Server to lock + * @return true if we were able to acquire the lock on the server, otherwise false. + */ + public boolean tryAcquireServerSharedLock(final ServerName serverName) { + return getServerQueueWithLock(serverName).trySharedLock(); + } + + /** + * Release the shared lock taken + * @see #tryAcquireServerSharedLock(ServerName) + * @param serverName the server that has the shared lock + */ + public void releaseServerSharedLock(final ServerName serverName) { + getServerQueueWithLock(serverName).releaseSharedLock(); + } + + // ============================================================================ + // Generic Helpers + // ============================================================================ + private static interface QueueInterface { + boolean isAvailable(); + boolean isEmpty(); + int size(); + void add(Procedure proc, boolean addFront); + boolean requireExclusiveLock(Procedure proc); + Procedure poll(); + + boolean isSuspended(); + } + + private static abstract class Queue> implements QueueInterface { + private Queue avlRight = null; + private Queue avlLeft = null; + private int avlHeight = 1; + + private Queue iterNext = null; + private Queue iterPrev = null; + private boolean suspended = false; + + private boolean exclusiveLock = false; + private int sharedLock = 0; + + private final TKey key; + private final int priority; + + public Queue(TKey key) { + this(key, 1); + } + + public Queue(TKey key, int priority) { + this.key = key; + this.priority = priority; + } + + protected TKey getKey() { + return key; + } + + protected int getPriority() { + return priority; + } + + /** + * True if the queue is not in the run-queue and it is owned by an event. + */ + public boolean isSuspended() { + return suspended; + } + + protected boolean setSuspended(boolean isSuspended) { + if (this.suspended == isSuspended) return false; + this.suspended = isSuspended; + return true; + } + + // ====================================================================== + // Read/Write Locking helpers + // ====================================================================== + public synchronized boolean isLocked() { + return hasExclusiveLock() || sharedLock > 0; + } + + public synchronized boolean hasExclusiveLock() { + return this.exclusiveLock; + } + + public synchronized boolean trySharedLock() { + if (hasExclusiveLock()) return false; + sharedLock++; + return true; + } + + public synchronized void releaseSharedLock() { + sharedLock--; + } + + protected synchronized boolean isSingleSharedLock() { + return sharedLock == 1; + } + + public synchronized boolean tryExclusiveLock() { + if (isLocked()) return false; + exclusiveLock = true; + return true; + } + + public synchronized void releaseExclusiveLock() { + exclusiveLock = false; + } + + public synchronized boolean acquireDeleteLock() { + return tryExclusiveLock(); + } + + // This should go away when we have the new AM and its events + // and we move xlock to the lock-event-queue. + public synchronized boolean isAvailable() { + return !exclusiveLock && !isEmpty(); + } + + // ====================================================================== + // Generic Helpers + // ====================================================================== + public int compareKey(TKey cmpKey) { + return key.compareTo(cmpKey); + } + + public int compareTo(Queue other) { + return compareKey(other.key); + } + + @Override + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), key); + } + } + + // ====================================================================== + // Helper Data Structures + // ====================================================================== + private static abstract class QueueImpl> extends Queue { + private final ArrayDeque runnables = new ArrayDeque(); + + public QueueImpl(TKey key) { + super(key); + } + + public QueueImpl(TKey key, int priority) { + super(key, priority); + } + + public void add(final Procedure proc, final boolean addToFront) { + if (addToFront) { + addFront(proc); + } else { + addBack(proc); + } + } + + protected void addFront(final Procedure proc) { + runnables.addFirst(proc); + } + + protected void addBack(final Procedure proc) { + runnables.addLast(proc); + } + + @Override + public Procedure poll() { + return runnables.poll(); + } + + @Override + public boolean isEmpty() { + return runnables.isEmpty(); + } + + public int size() { + return runnables.size(); + } + } + + private static class FairQueue> { + private final int quantum; + + private Queue currentQueue = null; + private Queue queueHead = null; + private int currentQuantum = 0; + + public FairQueue() { + this(1); + } + + public FairQueue(int quantum) { + this.quantum = quantum; + } + + public void add(Queue queue) { + queueHead = IterableList.append(queueHead, queue); + if (currentQueue == null) setNextQueue(queueHead); + } + + public void remove(Queue queue) { + Queue nextQueue = queue.iterNext; + queueHead = IterableList.remove(queueHead, queue); + if (currentQueue == queue) { + setNextQueue(queueHead != null ? nextQueue : null); + } + } + + public Queue poll() { + if (currentQuantum == 0) { + if (!nextQueue()) { + return null; // nothing here + } + currentQuantum = calculateQuantum(currentQueue) - 1; + } else { + currentQuantum--; + } + + // This should go away when we have the new AM and its events + if (!currentQueue.isAvailable()) { + Queue lastQueue = currentQueue; + do { + if (!nextQueue()) + return null; + } while (currentQueue != lastQueue && !currentQueue.isAvailable()); + + currentQuantum = calculateQuantum(currentQueue) - 1; + } + return currentQueue; + } + + private boolean nextQueue() { + if (currentQueue == null) return false; + currentQueue = currentQueue.iterNext; + return currentQueue != null; + } + + private void setNextQueue(Queue queue) { + currentQueue = queue; + if (queue != null) { + currentQuantum = calculateQuantum(currentQueue); + } else { + currentQuantum = 0; + } + } + + private int calculateQuantum(final Queue queue) { + return Math.max(1, queue.getPriority() * quantum); // TODO + } + } + + private static class AvlTree { + public static > Queue get(Queue root, T key) { + while (root != null) { + int cmp = root.compareKey(key); + if (cmp > 0) { + root = root.avlLeft; + } else if (cmp < 0) { + root = root.avlRight; + } else { + return root; + } + } + return null; + } + + public static > Queue getFirst(Queue root) { + if (root != null) { + while (root.avlLeft != null) { + root = root.avlLeft; + } + } + return root; + } + + public static > Queue getLast(Queue root) { + if (root != null) { + while (root.avlRight != null) { + root = root.avlRight; + } + } + return root; + } + + public static > Queue insert(Queue root, Queue node) { + if (root == null) return node; + if (node.compareTo(root) < 0) { + root.avlLeft = insert(root.avlLeft, node); + } else { + root.avlRight = insert(root.avlRight, node); + } + return balance(root); + } + + private static > Queue removeMin(Queue p) { + if (p.avlLeft == null) + return p.avlRight; + p.avlLeft = removeMin(p.avlLeft); + return balance(p); + } + + public static > Queue remove(Queue root, T key) { + if (root == null) return null; + + int cmp = root.compareKey(key); + if (cmp == 0) { + Queue q = root.avlLeft; + Queue r = root.avlRight; + if (r == null) return q; + Queue min = getFirst(r); + min.avlRight = removeMin(r); + min.avlLeft = q; + return balance(min); + } else if (cmp > 0) { + root.avlLeft = remove(root.avlLeft, key); + } else /* if (cmp < 0) */ { + root.avlRight = remove(root.avlRight, key); + } + return balance(root); + } + + private static > Queue balance(Queue p) { + fixHeight(p); + int balance = balanceFactor(p); + if (balance == 2) { + if (balanceFactor(p.avlRight) < 0) { + p.avlRight = rotateRight(p.avlRight); + } + return rotateLeft(p); + } else if (balance == -2) { + if (balanceFactor(p.avlLeft) > 0) { + p.avlLeft = rotateLeft(p.avlLeft); + } + return rotateRight(p); + } + return p; + } + + private static > Queue rotateRight(Queue p) { + Queue q = p.avlLeft; + p.avlLeft = q.avlRight; + q.avlRight = p; + fixHeight(p); + fixHeight(q); + return q; + } + + private static > Queue rotateLeft(Queue q) { + Queue p = q.avlRight; + q.avlRight = p.avlLeft; + p.avlLeft = q; + fixHeight(q); + fixHeight(p); + return p; + } + + private static > void fixHeight(Queue node) { + int heightLeft = height(node.avlLeft); + int heightRight = height(node.avlRight); + node.avlHeight = 1 + Math.max(heightLeft, heightRight); + } + + private static > int height(Queue node) { + return node != null ? node.avlHeight : 0; + } + + private static > int balanceFactor(Queue node) { + return height(node.avlRight) - height(node.avlLeft); + } + } + + private static class IterableList { + public static > Queue prepend(Queue head, Queue node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + Queue tail = head.iterPrev; + tail.iterNext = node; + head.iterPrev = node; + node.iterNext = head; + node.iterPrev = tail; + } else { + node.iterNext = node; + node.iterPrev = node; + } + return node; + } + + public static > Queue append(Queue head, Queue node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + Queue tail = head.iterPrev; + tail.iterNext = node; + node.iterNext = head; + node.iterPrev = tail; + head.iterPrev = node; + return head; + } + node.iterNext = node; + node.iterPrev = node; + return node; + } + + public static > Queue appendList(Queue head, Queue otherHead) { + if (head == null) return otherHead; + if (otherHead == null) return head; + + Queue tail = head.iterPrev; + Queue otherTail = otherHead.iterPrev; + tail.iterNext = otherHead; + otherHead.iterPrev = tail; + otherTail.iterNext = head; + head.iterPrev = otherTail; + return head; + } + + private static > Queue remove(Queue head, Queue node) { + assert isLinked(node) : node + " is not linked"; + if (node != node.iterNext) { + node.iterPrev.iterNext = node.iterNext; + node.iterNext.iterPrev = node.iterPrev; + head = (head == node) ? node.iterNext : head; + } else { + head = null; + } + node.iterNext = null; + node.iterPrev = null; + return head; + } + + private static > boolean isLinked(Queue node) { + return node.iterPrev != null && node.iterNext != null; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java index a6f97daa1b1..bd4f9e56633 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_MODIFY_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family"); } @Override @@ -379,4 +376,4 @@ public class ModifyColumnFamilyProcedure }); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index 13a249611d1..329f7174c60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -215,10 +215,8 @@ public class ModifyTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - getTableName(), - EventType.C_M_MODIFY_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table"); } @Override @@ -508,4 +506,4 @@ public class ModifyTableProcedure } return regionInfoList; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 9e0b86e8996..cb8b637a80e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -588,13 +588,13 @@ implements ServerProcedureInterface { @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false; - return env.getProcedureQueue().tryAcquireServerExclusiveLock(this); + if (env.waitServerCrashProcessingEnabled(this)) return false; + return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName()); } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseServerExclusiveLock(this); + env.getProcedureQueue().releaseServerExclusiveLock(getServerName()); } @Override @@ -788,6 +788,11 @@ implements ServerProcedureInterface { return this.carryingMeta; } + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CRASH_HANDLER; + } + /** * For this procedure, yield at end of each successful flow step so that all crashed servers * can make progress rather than do the default which has each procedure running to completion diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index 5b0c45f8f4c..b5c24ff78c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public interface ServerProcedureInterface { + public enum ServerOperationType { + CRASH_HANDLER + }; + /** * @return Name of this server instance. */ @@ -37,4 +41,12 @@ public interface ServerProcedureInterface { * @return True if this server has an hbase:meta table region. */ boolean hasMetaTableRegion(); -} \ No newline at end of file + + /** + * Given an operation type we can take decisions about what to do with pending operations. + * e.g. if we get a crash handler and we have some assignment operation pending + * we can abort those operations. + * @return the operation type that the procedure is executing. + */ + ServerOperationType getServerOperationType(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java index 3c1e5934f97..da220f4f222 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java @@ -181,7 +181,7 @@ public class TruncateTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (env.waitInitialized(this)) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 70cb2fc2088..34715aad5a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -129,14 +129,14 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); try { - m.initialized = false; // fake it, set back later + m.setInitialized(false); // fake it, set back later HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO; m.move(meta.getEncodedNameAsBytes(), null); fail("Region should not be moved since master is not initialized"); } catch (IOException ioe) { assertTrue(ioe instanceof PleaseHoldException); } finally { - m.initialized = true; + m.setInitialized(true); } } @@ -173,13 +173,13 @@ public class TestMaster { try { List tableRegions = admin.getTableRegions(tableName); - master.initialized = false; // fake it, set back later + master.setInitialized(false); // fake it, set back later admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null); fail("Region should not be moved since master is not initialized"); } catch (IOException ioe) { assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException")); } finally { - master.initialized = true; + master.setInitialized(true); TEST_UTIL.deleteTable(tableName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index fe93bb552c4..e27b3a4bf08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -308,7 +308,7 @@ public class TestMasterNoCluster { try { // Wait till master is initialized. - while (!master.initialized) Threads.sleep(10); + while (!master.isInitialized()) Threads.sleep(10); LOG.info("Master is initialized"); assertFalse("The dead server should not be pulled in", diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java new file mode 100644 index 00000000000..0027c2f21e5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, MediumTests.class}) +public class TestMasterProcedureEvents { + private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static long nonceGroup = HConstants.NO_NONCE; + private static long nonce = HConstants.NO_NONCE; + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8); + conf.setBoolean("hbase.procedure.store.wal.use.hsync", false); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test + public void testMasterInitializedEvent() throws Exception { + TableName tableName = TableName.valueOf("testMasterInitializedEvent"); + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + + HRegionInfo hri = new HRegionInfo(tableName); + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("f"); + htd.addFamily(hcd); + + while (!master.isInitialized()) Thread.sleep(250); + master.setInitialized(false); // fake it, set back later + + CreateTableProcedure proc = new CreateTableProcedure( + procExec.getEnvironment(), htd, new HRegionInfo[] { hri }); + + long pollCalls = procSched.getPollCalls(); + long nullPollCalls = procSched.getNullPollCalls(); + + long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + for (int i = 0; i < 10; ++i) { + Thread.sleep(100); + assertEquals(pollCalls + 1, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + master.setInitialized(true); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + assertEquals(pollCalls + 2, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + @Test + public void testServerCrashProcedureEvent() throws Exception { + TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb"); + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + + while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(25); + } + + UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]); + try (Table t = UTIL.getConnection().getTable(tableName)) { + // Load the table with a bit of data so some logs to split and some edits in each region. + UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]); + } + + master.setServerCrashProcessingEnabled(false); // fake it, set back later + + long pollCalls = procSched.getPollCalls(); + long nullPollCalls = procSched.getNullPollCalls(); + + // Kill a server. Master will notice but do nothing other than add it to list of dead servers. + HRegionServer hrs = getServerWithRegions(); + boolean carryingMeta = master.getAssignmentManager() + .isCarryingMeta(hrs.getServerName()) == AssignmentManager.ServerHostRegion.HOSTING_REGION; + UTIL.getHBaseCluster().killRegionServer(hrs.getServerName()); + hrs.join(); + + // Wait until the expiration of the server has arrived at the master. We won't process it + // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait + // here so ServerManager gets notice and adds expired server to appropriate queues. + while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10); + + // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'. + master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName()); + + long procId = procExec.submitProcedure( + new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta)); + + for (int i = 0; i < 10; ++i) { + Thread.sleep(100); + assertEquals(pollCalls + 1, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + // Now, reenable processing else we can't get a lock on the ServerCrashProcedure. + master.setServerCrashProcessingEnabled(true); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + LOG.debug("server crash processing poll calls: " + procSched.getPollCalls()); + assertTrue(procSched.getPollCalls() >= (pollCalls + 2)); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + + UTIL.deleteTable(tableName); + } + + private HRegionServer getServerWithRegions() { + for (int i = 0; i < 3; ++i) { + HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i); + if (hrs.getNumberOfOnlineRegions() > 0) { + return hrs; + } + } + return null; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java similarity index 90% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 68384ce7f4d..2b92e524c94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -18,10 +18,6 @@ package org.apache.hadoop.hbase.master.procedure; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -39,23 +35,29 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) -public class TestMasterProcedureQueue { - private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class); +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; - private MasterProcedureQueue queue; +@Category({MasterTests.class, SmallTests.class}) +public class TestMasterProcedureScheduler { + private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class); + + private MasterProcedureScheduler queue; private Configuration conf; @Before public void setUp() throws IOException { conf = HBaseConfiguration.create(); - queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager()); + queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager()); } @After @@ -65,7 +67,7 @@ public class TestMasterProcedureQueue { @Test public void testConcurrentCreateDelete() throws Exception { - final MasterProcedureQueue procQueue = queue; + final MasterProcedureScheduler procQueue = queue; final TableName table = TableName.valueOf("testtb"); final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean failure = new AtomicBoolean(false); @@ -135,9 +137,14 @@ public class TestMasterProcedureQueue { for (int j = 1; j <= NUM_ITEMS; ++j) { for (int i = 1; i <= NUM_TABLES; ++i) { - Long procId = queue.poll(); + Procedure proc = queue.poll(); + assertTrue(proc != null); + TableName tableName = ((TestTableProcedure)proc).getTableName(); + queue.tryAcquireTableExclusiveLock(tableName, "test"); + queue.releaseTableExclusiveLock(tableName); + queue.completionCleanup(proc); assertEquals(--count, queue.size()); - assertEquals(i * 1000 + j, procId.longValue()); + assertEquals(i * 1000 + j, proc.getProcId()); } } assertEquals(0, queue.size()); @@ -164,7 +171,7 @@ public class TestMasterProcedureQueue { assertFalse(queue.markTableAsDeleted(tableName)); // fetch item and take a lock - assertEquals(1, queue.poll().longValue()); + assertEquals(1, queue.poll().getProcId()); // take the xlock assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write")); // table can't be deleted because we have the lock @@ -195,7 +202,7 @@ public class TestMasterProcedureQueue { for (int i = 1; i <= nitems; ++i) { // fetch item and take a lock - assertEquals(i, queue.poll().longValue()); + assertEquals(i, queue.poll().getProcId()); // take the rlock assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i)); // table can't be deleted because we have locks and/or items in the queue @@ -233,24 +240,24 @@ public class TestMasterProcedureQueue { TableProcedureInterface.TableOperationType.READ)); // Fetch the 1st item and take the write lock - Long procId = queue.poll(); - assertEquals(1, procId.longValue()); + long procId = queue.poll().getProcId(); + assertEquals(1, procId); assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); // Fetch the 2nd item and verify that the lock can't be acquired - assertEquals(null, queue.poll()); + assertEquals(null, queue.poll(0)); // Release the write lock and acquire the read lock queue.releaseTableExclusiveLock(tableName); // Fetch the 2nd item and take the read lock - procId = queue.poll(); - assertEquals(2, procId.longValue()); + procId = queue.poll().getProcId(); + assertEquals(2, procId); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); // Fetch the 3rd item and verify that the lock can't be acquired - procId = queue.poll(); - assertEquals(3, procId.longValue()); + procId = queue.poll().getProcId(); + assertEquals(3, procId); assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); // release the rdlock of item 2 and take the wrlock for the 3d item @@ -258,19 +265,19 @@ public class TestMasterProcedureQueue { assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); // Fetch 4th item and verify that the lock can't be acquired - assertEquals(null, queue.poll()); + assertEquals(null, queue.poll(0)); // Release the write lock and acquire the read lock queue.releaseTableExclusiveLock(tableName); // Fetch the 4th item and take the read lock - procId = queue.poll(); - assertEquals(4, procId.longValue()); + procId = queue.poll().getProcId(); + assertEquals(4, procId); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); // Fetch the 4th item and take the read lock - procId = queue.poll(); - assertEquals(5, procId.longValue()); + procId = queue.poll().getProcId(); + assertEquals(5, procId); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); // Release 4th and 5th read-lock @@ -370,11 +377,11 @@ public class TestMasterProcedureQueue { } public static class TestTableProcSet { - private final MasterProcedureQueue queue; + private final MasterProcedureScheduler queue; private Map procsMap = new ConcurrentHashMap(); - public TestTableProcSet(final MasterProcedureQueue queue) { + public TestTableProcSet(final MasterProcedureScheduler queue) { this.queue = queue; } @@ -394,8 +401,8 @@ public class TestMasterProcedureQueue { TableProcedureInterface proc = null; boolean avail = false; while (!avail) { - Long procId = queue.poll(); - proc = procId != null ? procsMap.remove(procId) : null; + Procedure xProc = queue.poll(); + proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null; if (proc == null) break; switch (proc.getTableOperationType()) { case CREATE: @@ -411,7 +418,7 @@ public class TestMasterProcedureQueue { } if (!avail) { addFront(proc); - LOG.debug("yield procId=" + procId); + LOG.debug("yield procId=" + proc); } } return proc;