HBASE-14837 Procedure v2 - Procedure Queue Improvement

This commit is contained in:
Matteo Bertozzi 2016-01-14 08:29:10 -08:00
parent dc57996ca6
commit 18a48af242
26 changed files with 1562 additions and 1001 deletions

View File

@ -224,10 +224,10 @@ public class ProcedureInfo implements Cloneable {
procProto.getOwner(), procProto.getOwner(),
procProto.getState(), procProto.getState(),
procProto.hasParentId() ? procProto.getParentId() : -1, procProto.hasParentId() ? procProto.getParentId() : -1,
procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null, procProto.hasException() ? procProto.getException() : null,
procProto.getLastUpdate(), procProto.getLastUpdate(),
procProto.getStartTime(), procProto.getStartTime(),
procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null); procProto.hasResult() ? procProto.getResult().toByteArray() : null);
} }
/** /**

View File

@ -785,8 +785,7 @@ public class ProcedureExecutor<TEnvironment> {
*/ */
private void execLoop() { private void execLoop() {
while (isRunning()) { while (isRunning()) {
Long procId = runnables.poll(); Procedure proc = runnables.poll();
Procedure proc = procId != null ? procedures.get(procId) : null;
if (proc == null) continue; if (proc == null) continue;
try { try {

View File

@ -1,174 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* This class is a container of queues that allows to select a queue
* in a round robin fashion, considering priority of the queue.
*
* the quantum is just how many poll() will return the same object.
* e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
* e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
* then the object priority is just a priority * quantum
*
* Example:
* - three queues (A, B, C) with priorities (1, 1, 2)
* - The first poll() will return A
* - The second poll() will return B
* - The third and forth poll() will return C
* - and so on again and again.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
private ConcurrentSkipListMap<TKey, TQueue> objMap =
new ConcurrentSkipListMap<TKey, TQueue>();
private final ReentrantLock lock = new ReentrantLock();
private final int quantum;
private Map.Entry<TKey, TQueue> current = null;
private int currentQuantum = 0;
public interface FairObject {
boolean isAvailable();
int getPriority();
}
/**
* @param quantum how many poll() will return the same object.
*/
public ProcedureFairRunQueues(final int quantum) {
this.quantum = quantum;
}
public TQueue get(final TKey key) {
return objMap.get(key);
}
public TQueue add(final TKey key, final TQueue queue) {
TQueue oldq = objMap.putIfAbsent(key, queue);
return oldq != null ? oldq : queue;
}
public TQueue remove(final TKey key) {
TQueue queue = objMap.get(key);
if (queue != null) {
lock.lock();
try {
queue = objMap.remove(key);
if (current != null && queue == current.getValue()) {
currentQuantum = 0;
current = null;
}
} finally {
lock.unlock();
}
}
return queue;
}
public void clear() {
lock.lock();
try {
currentQuantum = 0;
current = null;
objMap.clear();
} finally {
lock.unlock();
}
}
/**
* @return the next available item if present
*/
public TQueue poll() {
lock.lock();
try {
TQueue queue;
if (currentQuantum == 0) {
if (nextObject() == null) {
// nothing here
return null;
}
queue = current.getValue();
currentQuantum = calculateQuantum(queue) - 1;
} else {
currentQuantum--;
queue = current.getValue();
}
if (!queue.isAvailable()) {
Map.Entry<TKey, TQueue> last = current;
// Try the next one
do {
if (nextObject() == null)
return null;
} while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
queue = current.getValue();
currentQuantum = calculateQuantum(queue) - 1;
}
return queue;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append('{');
for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
builder.append(entry.getKey());
builder.append(':');
builder.append(entry.getValue());
}
builder.append('}');
return builder.toString();
}
private Map.Entry<TKey, TQueue> nextObject() {
Map.Entry<TKey, TQueue> next = null;
// If we have already a key, try the next one
if (current != null) {
next = objMap.higherEntry(current.getKey());
}
// if there is no higher key, go back to the first
current = (next != null) ? next : objMap.firstEntry();
return current;
}
private int calculateQuantum(final TQueue fairObject) {
// TODO
return Math.max(1, fairObject.getPriority() * quantum);
}
}

View File

@ -55,9 +55,9 @@ public interface ProcedureRunnableSet {
/** /**
* Fetch one Procedure from the queue * Fetch one Procedure from the queue
* @return the Procedure ID to execute, or null if nothing present. * @return the Procedure to execute, or null if nothing present.
*/ */
Long poll(); Procedure poll();
/** /**
* In case the class is blocking on poll() waiting for items to be added, * In case the class is blocking on poll() waiting for items to be added,

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
private final Deque<Long> runnables = new ArrayDeque<Long>(); private final Deque<Procedure> runnables = new ArrayDeque<Procedure>();
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition(); private final Condition waitCond = lock.newCondition();
@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
public void addFront(final Procedure proc) { public void addFront(final Procedure proc) {
lock.lock(); lock.lock();
try { try {
runnables.addFirst(proc.getProcId()); runnables.addFirst(proc);
waitCond.signal(); waitCond.signal();
} finally { } finally {
lock.unlock(); lock.unlock();
@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
public void addBack(final Procedure proc) { public void addBack(final Procedure proc) {
lock.lock(); lock.lock();
try { try {
runnables.addLast(proc.getProcId()); runnables.addLast(proc);
waitCond.signal(); waitCond.signal();
} finally { } finally {
lock.unlock(); lock.unlock();
@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
@Override @Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Long poll() { public Procedure poll() {
lock.lock(); lock.lock();
try { try {
if (runnables.isEmpty()) { if (runnables.isEmpty()) {

View File

@ -1,155 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
@Category({MasterTests.class, SmallTests.class})
public class TestProcedureFairRunQueues {
private static class TestRunQueue implements ProcedureFairRunQueues.FairObject {
private final int priority;
private final String name;
private boolean available = true;
public TestRunQueue(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public String toString() {
return name;
}
private void setAvailable(boolean available) {
this.available = available;
}
@Override
public boolean isAvailable() {
return available;
}
@Override
public int getPriority() {
return priority;
}
}
@Test
public void testEmptyFairQueues() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
for (int i = 0; i < 3; ++i) {
assertEquals(null, fairq.poll());
}
}
@Test
public void testFairQueues() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
for (int i = 0; i < 3; ++i) {
assertEquals(a, fairq.poll());
assertEquals(b, fairq.poll());
assertEquals(m, fairq.poll());
assertEquals(m, fairq.poll());
}
}
@Test
public void testFairQueuesNotAvailable() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
// m is not available
m.setAvailable(false);
for (int i = 0; i < 3; ++i) {
assertEquals(a, fairq.poll());
assertEquals(b, fairq.poll());
}
// m is available
m.setAvailable(true);
for (int i = 0; i < 3; ++i) {
assertEquals(m, fairq.poll());
assertEquals(m, fairq.poll());
assertEquals(a, fairq.poll());
assertEquals(b, fairq.poll());
}
// b is not available
b.setAvailable(false);
for (int i = 0; i < 3; ++i) {
assertEquals(m, fairq.poll());
assertEquals(m, fairq.poll());
assertEquals(a, fairq.poll());
}
assertEquals(m, fairq.poll());
m.setAvailable(false);
// m should be fetched next, but is no longer available
assertEquals(a, fairq.poll());
assertEquals(a, fairq.poll());
b.setAvailable(true);
for (int i = 0; i < 3; ++i) {
assertEquals(b, fairq.poll());
assertEquals(a, fairq.poll());
}
}
@Test
public void testFairQueuesDelete() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
// Fetch A and then remove it
assertEquals(a, fairq.poll());
assertEquals(a, fairq.remove("A"));
// Fetch B and then remove it
assertEquals(b, fairq.poll());
assertEquals(b, fairq.remove("B"));
// Fetch M and then remove it
assertEquals(m, fairq.poll());
assertEquals(m, fairq.remove("M"));
// nothing left
assertEquals(null, fairq.poll());
}
}

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@ -277,14 +278,15 @@ public class HMaster extends HRegionServer implements MasterServices {
// flag set after we complete initialization once active, // flag set after we complete initialization once active,
// it is not private since it's used in unit tests // it is not private since it's used in unit tests
volatile boolean initialized = false; private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
// flag set after master services are started, // flag set after master services are started,
// initialization may have not completed yet. // initialization may have not completed yet.
volatile boolean serviceStarted = false; volatile boolean serviceStarted = false;
// flag set after we complete assignMeta. // flag set after we complete assignMeta.
private volatile boolean serverCrashProcessingEnabled = false; private final ProcedureEvent serverCrashProcessingEnabled =
new ProcedureEvent("server crash processing");
LoadBalancer balancer; LoadBalancer balancer;
private RegionNormalizer normalizer; private RegionNormalizer normalizer;
@ -781,8 +783,10 @@ public class HMaster extends HRegionServer implements MasterServices {
status.markComplete("Initialization successful"); status.markComplete("Initialization successful");
LOG.info("Master has completed initialization"); LOG.info("Master has completed initialization");
configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.balancer);
// Set master as 'initialized'. // Set master as 'initialized'.
initialized = true; setInitialized(true);
// assign the meta replicas // assign the meta replicas
Set<ServerName> EMPTY_SET = new HashSet<ServerName>(); Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM, int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
@ -976,8 +980,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// servers. This is required so that if meta is assigning to a server which dies after // servers. This is required so that if meta is assigning to a server which dies after
// assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
// stuck here waiting forever if waitForMeta is specified. // stuck here waiting forever if waitForMeta is specified.
if (!serverCrashProcessingEnabled) { if (!isServerCrashProcessingEnabled()) {
serverCrashProcessingEnabled = true; setServerCrashProcessingEnabled(true);
this.serverManager.processQueuedDeadServers(); this.serverManager.processQueuedDeadServers();
} }
@ -1207,7 +1211,7 @@ public class HMaster extends HRegionServer implements MasterServices {
public boolean balance(boolean force) throws IOException { public boolean balance(boolean force) throws IOException {
// if master not initialized, don't run balancer. // if master not initialized, don't run balancer.
if (!this.initialized) { if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run balancer."); LOG.debug("Master has not been initialized, don't run balancer.");
return false; return false;
} }
@ -1308,7 +1312,7 @@ public class HMaster extends HRegionServer implements MasterServices {
* is globally disabled) * is globally disabled)
*/ */
public boolean normalizeRegions() throws IOException { public boolean normalizeRegions() throws IOException {
if (!this.initialized) { if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run region normalizer."); LOG.debug("Master has not been initialized, don't run region normalizer.");
return false; return false;
} }
@ -1615,7 +1619,7 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
} }
private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
throws IOException { throws IOException {
// FIFO compaction has some requirements // FIFO compaction has some requirements
// Actually FCP ignores periodic major compactions // Actually FCP ignores periodic major compactions
@ -1672,7 +1676,7 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
} }
} }
// HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
String message, Exception cause) throws IOException { String message, Exception cause) throws IOException {
@ -2300,6 +2304,15 @@ public class HMaster extends HRegionServer implements MasterServices {
*/ */
@Override @Override
public boolean isInitialized() { public boolean isInitialized() {
return initialized.isReady();
}
@VisibleForTesting
public void setInitialized(boolean isInitialized) {
procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
}
public ProcedureEvent getInitializedEvent() {
return initialized; return initialized;
} }
@ -2310,12 +2323,16 @@ public class HMaster extends HRegionServer implements MasterServices {
*/ */
@Override @Override
public boolean isServerCrashProcessingEnabled() { public boolean isServerCrashProcessingEnabled() {
return this.serverCrashProcessingEnabled; return serverCrashProcessingEnabled.isReady();
} }
@VisibleForTesting @VisibleForTesting
public void setServerCrashProcessingEnabled(final boolean b) { public void setServerCrashProcessingEnabled(final boolean b) {
this.serverCrashProcessingEnabled = b; procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
}
public ProcedureEvent getServerCrashProcessingEnabledEvent() {
return serverCrashProcessingEnabled;
} }
/** /**

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family");
tableName,
EventType.C_M_ADD_FAMILY.toString());
} }
@Override @Override

View File

@ -205,7 +205,9 @@ public class CreateNamespaceProcedure
return true; return true;
} }
return false; if (env.waitInitialized(this)) {
return false;
}
} }
return getTableNamespaceManager(env).acquireExclusiveLock(); return getTableNamespaceManager(env).acquireExclusiveLock();
} }

View File

@ -266,7 +266,7 @@ public class CreateTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized() && !getTableName().isSystemTable()) { if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
return false; return false;
} }
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table"); return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table");

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -202,10 +201,8 @@ public class DeleteColumnFamilyProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family");
tableName,
EventType.C_M_DELETE_FAMILY.toString());
} }
@Override @Override

View File

@ -200,7 +200,7 @@ public class DeleteTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table"); return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table");
} }

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@ -215,10 +214,8 @@ public class DisableTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table");
tableName,
EventType.C_M_DISABLE_TABLE.toString());
} }
@Override @Override

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.GeneralBulkAssigner; import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
@ -235,10 +234,8 @@ public class EnableTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table");
tableName,
EventType.C_M_ENABLE_TABLE.toString());
} }
@Override @Override

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -85,12 +87,12 @@ public class MasterProcedureEnv {
} }
} }
private final MasterProcedureQueue procQueue; private final MasterProcedureScheduler procSched;
private final MasterServices master; private final MasterServices master;
public MasterProcedureEnv(final MasterServices master) { public MasterProcedureEnv(final MasterServices master) {
this.master = master; this.master = master;
this.procQueue = new MasterProcedureQueue(master.getConfiguration(), this.procSched = new MasterProcedureScheduler(master.getConfiguration(),
master.getTableLockManager()); master.getTableLockManager());
} }
@ -114,8 +116,8 @@ public class MasterProcedureEnv {
return master.getMasterCoprocessorHost(); return master.getMasterCoprocessorHost();
} }
public MasterProcedureQueue getProcedureQueue() { public MasterProcedureScheduler getProcedureQueue() {
return procQueue; return procSched;
} }
public boolean isRunning() { public boolean isRunning() {
@ -125,4 +127,28 @@ public class MasterProcedureEnv {
public boolean isInitialized() { public boolean isInitialized() {
return master.isInitialized(); return master.isInitialized();
} }
public boolean waitInitialized(Procedure proc) {
return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc);
}
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
}
public void wake(ProcedureEvent event) {
procSched.wake(event);
}
public void suspend(ProcedureEvent event) {
procSched.suspend(event);
}
public void setEventReady(ProcedureEvent event, boolean isReady) {
if (isReady) {
procSched.wake(event);
} else {
procSched.suspend(event);
}
}
} }

View File

@ -1,578 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues;
import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
/**
* ProcedureRunnableSet for the Master Procedures.
* This RunnableSet tries to provide to the ProcedureExecutor procedures
* that can be executed without having to wait on a lock.
* Most of the master operations can be executed concurrently, if they
* are operating on different tables (e.g. two create table can be performed
* at the same, time assuming table A and table B) or against two different servers; say
* two servers that crashed at about the same time.
*
* <p>Each procedure should implement an interface providing information for this queue.
* for example table related procedures should implement TableProcedureInterface.
* each procedure will be pushed in its own queue, and based on the operation type
* we may take smarter decision. e.g. we can abort all the operations preceding
* a delete table, or similar.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MasterProcedureQueue implements ProcedureRunnableSet {
private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
// Two queues to ensure that server procedures run ahead of table precedures always.
private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ;
/**
* Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the
* server that was carrying meta should rise to the top of the queue (this is how it used to
* work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
* that were carrying system tables on crash; do I need to have these servers have priority?
*
* <p>Apart from the special-casing of meta and system tables, fairq is what we want
*/
private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final TableLockManager lockManager;
private final int metaTablePriority;
private final int userTablePriority;
private final int sysTablePriority;
private static final int DEFAULT_SERVER_PRIORITY = 1;
/**
* Keeps count across server and table queues.
*/
private int queueSize;
public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1);
this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1);
this.lockManager = lockManager;
// TODO: should this be part of the HTD?
metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
}
@Override
public void addFront(final Procedure proc) {
lock.lock();
try {
getRunQueueOrCreate(proc).addFront(proc);
queueSize++;
waitCond.signal();
} finally {
lock.unlock();
}
}
@Override
public void addBack(final Procedure proc) {
lock.lock();
try {
getRunQueueOrCreate(proc).addBack(proc);
queueSize++;
waitCond.signal();
} finally {
lock.unlock();
}
}
@Override
public void yield(final Procedure proc) {
addBack(proc);
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Long poll() {
Long pollResult = null;
lock.lock();
try {
if (queueSize == 0) {
waitCond.await();
if (queueSize == 0) {
return null;
}
}
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
pollResult = doPoll(serverFairQ.poll());
if (pollResult == null) {
pollResult = doPoll(tableFairQ.poll());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
return pollResult;
}
private Long doPoll(final RunQueue rq) {
if (rq == null || !rq.isAvailable()) return null;
this.queueSize--;
return rq.poll();
}
@Override
public void signalAll() {
lock.lock();
try {
waitCond.signalAll();
} finally {
lock.unlock();
}
}
@Override
public void clear() {
lock.lock();
try {
serverFairQ.clear();
tableFairQ.clear();
queueSize = 0;
} finally {
lock.unlock();
}
}
@Override
public int size() {
lock.lock();
try {
return queueSize;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
lock.lock();
try {
return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ +
", serverFairQ: " + serverFairQ;
} finally {
lock.unlock();
}
}
@Override
public void completionCleanup(Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
boolean tableDeleted;
if (proc.hasException()) {
IOException procEx = proc.getException().unwrapRemoteException();
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
// create failed because the table already exist
tableDeleted = !(procEx instanceof TableExistsException);
} else {
// the operation failed because the table does not exist
tableDeleted = (procEx instanceof TableNotFoundException);
}
} else {
// the table was deleted
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
}
if (tableDeleted) {
markTableAsDeleted(iProcTable.getTableName());
}
}
// No cleanup for ServerProcedureInterface types, yet.
}
private RunQueue getRunQueueOrCreate(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
final TableName table = ((TableProcedureInterface)proc).getTableName();
return getRunQueueOrCreate(table);
}
if (proc instanceof ServerProcedureInterface) {
return getRunQueueOrCreate((ServerProcedureInterface)proc);
}
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
}
private TableRunQueue getRunQueueOrCreate(final TableName table) {
final TableRunQueue queue = getRunQueue(table);
if (queue != null) return queue;
return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table));
}
private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) {
final ServerRunQueue queue = getRunQueue(spi.getServerName());
if (queue != null) return queue;
return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi));
}
private TableRunQueue createTableRunQueue(final TableName table) {
int priority = userTablePriority;
if (table.equals(TableName.META_TABLE_NAME)) {
priority = metaTablePriority;
} else if (table.isSystemTable()) {
priority = sysTablePriority;
}
return new TableRunQueue(priority);
}
private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) {
return new ServerRunQueue(DEFAULT_SERVER_PRIORITY);
}
private TableRunQueue getRunQueue(final TableName table) {
return (TableRunQueue)tableFairQ.get(table);
}
private ServerRunQueue getRunQueue(final ServerName sn) {
return (ServerRunQueue)serverFairQ.get(sn);
}
/**
* Try to acquire the write lock on the specified table.
* other operations in the table-queue will be executed after the lock is released.
* @param table Table to lock
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose);
}
/**
* Release the write lock taken with tryAcquireTableWrite()
* @param table the name of the table that has the write lock
*/
public void releaseTableExclusiveLock(final TableName table) {
getRunQueue(table).releaseExclusiveLock(lockManager, table);
}
/**
* Try to acquire the read lock on the specified table.
* other read operations in the table-queue may be executed concurrently,
* otherwise they have to wait until all the read-locks are released.
* @param table Table to lock
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose);
}
/**
* Release the read lock taken with tryAcquireTableRead()
* @param table the name of the table that has the read lock
*/
public void releaseTableSharedLock(final TableName table) {
getRunQueue(table).releaseSharedLock(lockManager, table);
}
/**
* Try to acquire the write lock on the specified server.
* @see #releaseServerExclusiveLock(ServerProcedureInterface)
* @param spi Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) {
return getRunQueueOrCreate(spi).tryExclusiveLock();
}
/**
* Release the write lock
* @see #tryAcquireServerExclusiveLock(ServerProcedureInterface)
* @param spi the server that has the write lock
*/
public void releaseServerExclusiveLock(final ServerProcedureInterface spi) {
getRunQueue(spi.getServerName()).releaseExclusiveLock();
}
/**
* Try to acquire the read lock on the specified server.
* @see #releaseServerSharedLock(ServerProcedureInterface)
* @param spi Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) {
return getRunQueueOrCreate(spi).trySharedLock();
}
/**
* Release the read lock taken
* @see #tryAcquireServerSharedLock(ServerProcedureInterface)
* @param spi the server that has the read lock
*/
public void releaseServerSharedLock(final ServerProcedureInterface spi) {
getRunQueue(spi.getServerName()).releaseSharedLock();
}
/**
* Tries to remove the queue and the table-lock of the specified table.
* If there are new operations pending (e.g. a new create),
* the remove will not be performed.
* @param table the name of the table that should be marked as deleted
* @return true if deletion succeeded, false otherwise meaning that there are
* other new operations pending for that table (e.g. a new create).
*/
protected boolean markTableAsDeleted(final TableName table) {
TableRunQueue queue = getRunQueue(table);
if (queue != null) {
lock.lock();
try {
if (queue.isEmpty() && queue.acquireDeleteLock()) {
tableFairQ.remove(table);
// Remove the table lock
try {
lockManager.tableDeleted(table);
} catch (IOException e) {
LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
}
} else {
// TODO: If there are no create, we can drop all the other ops
return false;
}
} finally {
lock.unlock();
}
}
return true;
}
private interface RunQueue extends ProcedureFairRunQueues.FairObject {
void addFront(Procedure proc);
void addBack(Procedure proc);
Long poll();
boolean acquireDeleteLock();
}
/**
* Base abstract class for RunQueue implementations.
* Be careful honoring synchronizations in subclasses. In here we protect access but if you are
* acting on a state found in here, be sure dependent code keeps synchronization.
* Implements basic in-memory read/write locking mechanism to prevent procedure steps being run
* in parallel.
*/
private static abstract class AbstractRunQueue implements RunQueue {
// All modification of runnables happens with #lock held.
private final Deque<Long> runnables = new ArrayDeque<Long>();
private final int priority;
private boolean exclusiveLock = false;
private int sharedLock = 0;
public AbstractRunQueue(int priority) {
this.priority = priority;
}
boolean isEmpty() {
return this.runnables.isEmpty();
}
@Override
public boolean isAvailable() {
synchronized (this) {
return !exclusiveLock && !runnables.isEmpty();
}
}
@Override
public int getPriority() {
return this.priority;
}
@Override
public void addFront(Procedure proc) {
this.runnables.addFirst(proc.getProcId());
}
@Override
public void addBack(Procedure proc) {
this.runnables.addLast(proc.getProcId());
}
@Override
public Long poll() {
return this.runnables.poll();
}
@Override
public synchronized boolean acquireDeleteLock() {
return tryExclusiveLock();
}
public synchronized boolean isLocked() {
return isExclusiveLock() || sharedLock > 0;
}
public synchronized boolean isExclusiveLock() {
return this.exclusiveLock;
}
public synchronized boolean trySharedLock() {
if (isExclusiveLock()) return false;
sharedLock++;
return true;
}
public synchronized void releaseSharedLock() {
sharedLock--;
}
/**
* @return True if only one instance of a shared lock outstanding.
*/
synchronized boolean isSingleSharedLock() {
return sharedLock == 1;
}
public synchronized boolean tryExclusiveLock() {
if (isLocked()) return false;
exclusiveLock = true;
return true;
}
public synchronized void releaseExclusiveLock() {
exclusiveLock = false;
}
@Override
public String toString() {
return this.runnables.toString();
}
}
/**
* Run Queue for Server procedures.
*/
private static class ServerRunQueue extends AbstractRunQueue {
public ServerRunQueue(int priority) {
super(priority);
}
}
/**
* Run Queue for a Table. It contains a read-write lock that is used by the
* MasterProcedureQueue to decide if we should fetch an item from this queue
* or skip to another one which will be able to run without waiting for locks.
*/
private static class TableRunQueue extends AbstractRunQueue {
private TableLock tableLock = null;
public TableRunQueue(int priority) {
super(priority);
}
// TODO: Improve run-queue push with TableProcedureInterface.getType()
// we can take smart decisions based on the type of the operation (e.g. create/delete)
@Override
public void addBack(final Procedure proc) {
super.addBack(proc);
}
public synchronized boolean trySharedLock(final TableLockManager lockManager,
final TableName tableName, final String purpose) {
if (isExclusiveLock()) return false;
// Take zk-read-lock
tableLock = lockManager.readLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire read lock on " + tableName, e);
tableLock = null;
return false;
}
trySharedLock();
return true;
}
public synchronized void releaseSharedLock(final TableLockManager lockManager,
final TableName tableName) {
releaseTableLock(lockManager, isSingleSharedLock());
releaseSharedLock();
}
public synchronized boolean tryExclusiveLock(final TableLockManager lockManager,
final TableName tableName, final String purpose) {
if (isLocked()) return false;
// Take zk-write-lock
tableLock = lockManager.writeLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire write lock on " + tableName, e);
tableLock = null;
return false;
}
tryExclusiveLock();
return true;
}
public synchronized void releaseExclusiveLock(final TableLockManager lockManager,
final TableName tableName) {
releaseTableLock(lockManager, true);
releaseExclusiveLock();
}
private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
for (int i = 0; i < 3; ++i) {
try {
tableLock.release();
if (reset) {
tableLock = null;
}
break;
} catch (IOException e) {
LOG.warn("Could not release the table write-lock", e);
}
}
}
}
}

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family");
tableName,
EventType.C_M_MODIFY_FAMILY.toString());
} }
@Override @Override

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
@ -215,10 +214,8 @@ public class ModifyTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table");
getTableName(),
EventType.C_M_MODIFY_TABLE.toString());
} }
@Override @Override

View File

@ -553,13 +553,13 @@ implements ServerProcedureInterface {
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false; if (env.waitServerCrashProcessingEnabled(this)) return false;
return env.getProcedureQueue().tryAcquireServerExclusiveLock(this); return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName());
} }
@Override @Override
protected void releaseLock(final MasterProcedureEnv env) { protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseServerExclusiveLock(this); env.getProcedureQueue().releaseServerExclusiveLock(getServerName());
} }
@Override @Override
@ -751,6 +751,11 @@ implements ServerProcedureInterface {
return this.carryingMeta; return this.carryingMeta;
} }
@Override
public ServerOperationType getServerOperationType() {
return ServerOperationType.CRASH_HANDLER;
}
/** /**
* For this procedure, yield at end of each successful flow step so that all crashed servers * For this procedure, yield at end of each successful flow step so that all crashed servers
* can make progress rather than do the default which has each procedure running to completion * can make progress rather than do the default which has each procedure running to completion

View File

@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface ServerProcedureInterface { public interface ServerProcedureInterface {
public enum ServerOperationType {
CRASH_HANDLER
};
/** /**
* @return Name of this server instance. * @return Name of this server instance.
*/ */
@ -37,4 +41,12 @@ public interface ServerProcedureInterface {
* @return True if this server has an hbase:meta table region. * @return True if this server has an hbase:meta table region.
*/ */
boolean hasMetaTableRegion(); boolean hasMetaTableRegion();
}
/**
* Given an operation type we can take decisions about what to do with pending operations.
* e.g. if we get a crash handler and we have some assignment operation pending
* we can abort those operations.
* @return the operation type that the procedure is executing.
*/
ServerOperationType getServerOperationType();
}

View File

@ -182,7 +182,7 @@ public class TruncateTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table"); return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
} }

View File

@ -128,14 +128,14 @@ public class TestMaster {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster(); HMaster m = cluster.getMaster();
try { try {
m.initialized = false; // fake it, set back later m.setInitialized(false); // fake it, set back later
HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO; HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO;
m.move(meta.getEncodedNameAsBytes(), null); m.move(meta.getEncodedNameAsBytes(), null);
fail("Region should not be moved since master is not initialized"); fail("Region should not be moved since master is not initialized");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(ioe instanceof PleaseHoldException); assertTrue(ioe instanceof PleaseHoldException);
} finally { } finally {
m.initialized = true; m.setInitialized(true);
} }
} }
@ -172,13 +172,13 @@ public class TestMaster {
try { try {
List<HRegionInfo> tableRegions = admin.getTableRegions(tableName); List<HRegionInfo> tableRegions = admin.getTableRegions(tableName);
master.initialized = false; // fake it, set back later master.setInitialized(false); // fake it, set back later
admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null); admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null);
fail("Region should not be moved since master is not initialized"); fail("Region should not be moved since master is not initialized");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException")); assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException"));
} finally { } finally {
master.initialized = true; master.setInitialized(true);
TEST_UTIL.deleteTable(tableName); TEST_UTIL.deleteTable(tableName);
} }
} }

View File

@ -306,7 +306,7 @@ public class TestMasterNoCluster {
try { try {
// Wait till master is initialized. // Wait till master is initialized.
while (!master.initialized) Threads.sleep(10); while (!master.isInitialized()) Threads.sleep(10);
LOG.info("Master is initialized"); LOG.info("Master is initialized");
assertFalse("The dead server should not be pulled in", assertFalse("The dead server should not be pulled in",

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, MediumTests.class})
public class TestMasterProcedureEvents {
private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static long nonceGroup = HConstants.NO_NONCE;
private static long nonce = HConstants.NO_NONCE;
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
}
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(3);
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Test
public void testMasterInitializedEvent() throws Exception {
TableName tableName = TableName.valueOf("testMasterInitializedEvent");
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
HRegionInfo hri = new HRegionInfo(tableName);
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("f");
htd.addFamily(hcd);
while (!master.isInitialized()) Thread.sleep(250);
master.setInitialized(false); // fake it, set back later
CreateTableProcedure proc = new CreateTableProcedure(
procExec.getEnvironment(), htd, new HRegionInfo[] { hri });
long pollCalls = procSched.getPollCalls();
long nullPollCalls = procSched.getNullPollCalls();
long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
for (int i = 0; i < 10; ++i) {
Thread.sleep(100);
assertEquals(pollCalls + 1, procSched.getPollCalls());
assertEquals(nullPollCalls, procSched.getNullPollCalls());
}
master.setInitialized(true);
ProcedureTestingUtility.waitProcedure(procExec, procId);
assertEquals(pollCalls + 2, procSched.getPollCalls());
assertEquals(nullPollCalls, procSched.getNullPollCalls());
}
@Test
public void testServerCrashProcedureEvent() throws Exception {
TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb");
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() ||
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Thread.sleep(25);
}
UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]);
try (Table t = UTIL.getConnection().getTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
}
master.setServerCrashProcessingEnabled(false); // fake it, set back later
long pollCalls = procSched.getPollCalls();
long nullPollCalls = procSched.getNullPollCalls();
// Kill a server. Master will notice but do nothing other than add it to list of dead servers.
HRegionServer hrs = getServerWithRegions();
boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName());
UTIL.getHBaseCluster().killRegionServer(hrs.getServerName());
hrs.join();
// Wait until the expiration of the server has arrived at the master. We won't process it
// by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
// here so ServerManager gets notice and adds expired server to appropriate queues.
while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
long procId = procExec.submitProcedure(
new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta));
for (int i = 0; i < 10; ++i) {
Thread.sleep(100);
assertEquals(pollCalls + 1, procSched.getPollCalls());
assertEquals(nullPollCalls, procSched.getNullPollCalls());
}
// Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
master.setServerCrashProcessingEnabled(true);
ProcedureTestingUtility.waitProcedure(procExec, procId);
LOG.debug("server crash processing poll calls: " + procSched.getPollCalls());
assertTrue(procSched.getPollCalls() >= (pollCalls + 2));
assertEquals(nullPollCalls, procSched.getNullPollCalls());
UTIL.deleteTable(tableName);
}
private HRegionServer getServerWithRegions() {
for (int i = 0; i < 3; ++i) {
HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
if (hrs.getNumberOfOnlineRegions() > 0) {
return hrs;
}
}
return null;
}
}

View File

@ -50,16 +50,16 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@Category({MasterTests.class, SmallTests.class}) @Category({MasterTests.class, SmallTests.class})
public class TestMasterProcedureQueue { public class TestMasterProcedureScheduler {
private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class); private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
private MasterProcedureQueue queue; private MasterProcedureScheduler queue;
private Configuration conf; private Configuration conf;
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
conf = HBaseConfiguration.create(); conf = HBaseConfiguration.create();
queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager()); queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
} }
@After @After
@ -69,7 +69,7 @@ public class TestMasterProcedureQueue {
@Test @Test
public void testConcurrentCreateDelete() throws Exception { public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureQueue procQueue = queue; final MasterProcedureScheduler procQueue = queue;
final TableName table = TableName.valueOf("testtb"); final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false); final AtomicBoolean failure = new AtomicBoolean(false);
@ -139,9 +139,14 @@ public class TestMasterProcedureQueue {
for (int j = 1; j <= NUM_ITEMS; ++j) { for (int j = 1; j <= NUM_ITEMS; ++j) {
for (int i = 1; i <= NUM_TABLES; ++i) { for (int i = 1; i <= NUM_TABLES; ++i) {
Long procId = queue.poll(); Procedure proc = queue.poll();
assertTrue(proc != null);
TableName tableName = ((TestTableProcedure)proc).getTableName();
queue.tryAcquireTableExclusiveLock(tableName, "test");
queue.releaseTableExclusiveLock(tableName);
queue.completionCleanup(proc);
assertEquals(--count, queue.size()); assertEquals(--count, queue.size());
assertEquals(i * 1000 + j, procId.longValue()); assertEquals(i * 1000 + j, proc.getProcId());
} }
} }
assertEquals(0, queue.size()); assertEquals(0, queue.size());
@ -168,7 +173,7 @@ public class TestMasterProcedureQueue {
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName));
// fetch item and take a lock // fetch item and take a lock
assertEquals(1, queue.poll().longValue()); assertEquals(1, queue.poll().getProcId());
// take the xlock // take the xlock
assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write")); assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
// table can't be deleted because we have the lock // table can't be deleted because we have the lock
@ -199,7 +204,7 @@ public class TestMasterProcedureQueue {
for (int i = 1; i <= nitems; ++i) { for (int i = 1; i <= nitems; ++i) {
// fetch item and take a lock // fetch item and take a lock
assertEquals(i, queue.poll().longValue()); assertEquals(i, queue.poll().getProcId());
// take the rlock // take the rlock
assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i)); assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
// table can't be deleted because we have locks and/or items in the queue // table can't be deleted because we have locks and/or items in the queue
@ -237,24 +242,24 @@ public class TestMasterProcedureQueue {
TableProcedureInterface.TableOperationType.READ)); TableProcedureInterface.TableOperationType.READ));
// Fetch the 1st item and take the write lock // Fetch the 1st item and take the write lock
Long procId = queue.poll(); long procId = queue.poll().getProcId();
assertEquals(1, procId.longValue()); assertEquals(1, procId);
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// Fetch the 2nd item and verify that the lock can't be acquired // Fetch the 2nd item and verify that the lock can't be acquired
assertEquals(null, queue.poll()); assertEquals(null, queue.poll(0));
// Release the write lock and acquire the read lock // Release the write lock and acquire the read lock
queue.releaseTableExclusiveLock(tableName); queue.releaseTableExclusiveLock(tableName);
// Fetch the 2nd item and take the read lock // Fetch the 2nd item and take the read lock
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(2, procId.longValue()); assertEquals(2, procId);
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Fetch the 3rd item and verify that the lock can't be acquired // Fetch the 3rd item and verify that the lock can't be acquired
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(3, procId.longValue()); assertEquals(3, procId);
assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// release the rdlock of item 2 and take the wrlock for the 3d item // release the rdlock of item 2 and take the wrlock for the 3d item
@ -262,19 +267,19 @@ public class TestMasterProcedureQueue {
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// Fetch 4th item and verify that the lock can't be acquired // Fetch 4th item and verify that the lock can't be acquired
assertEquals(null, queue.poll()); assertEquals(null, queue.poll(0));
// Release the write lock and acquire the read lock // Release the write lock and acquire the read lock
queue.releaseTableExclusiveLock(tableName); queue.releaseTableExclusiveLock(tableName);
// Fetch the 4th item and take the read lock // Fetch the 4th item and take the read lock
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(4, procId.longValue()); assertEquals(4, procId);
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Fetch the 4th item and take the read lock // Fetch the 4th item and take the read lock
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(5, procId.longValue()); assertEquals(5, procId);
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Release 4th and 5th read-lock // Release 4th and 5th read-lock
@ -374,11 +379,11 @@ public class TestMasterProcedureQueue {
} }
public static class TestTableProcSet { public static class TestTableProcSet {
private final MasterProcedureQueue queue; private final MasterProcedureScheduler queue;
private Map<Long, TableProcedureInterface> procsMap = private Map<Long, TableProcedureInterface> procsMap =
new ConcurrentHashMap<Long, TableProcedureInterface>(); new ConcurrentHashMap<Long, TableProcedureInterface>();
public TestTableProcSet(final MasterProcedureQueue queue) { public TestTableProcSet(final MasterProcedureScheduler queue) {
this.queue = queue; this.queue = queue;
} }
@ -398,8 +403,8 @@ public class TestMasterProcedureQueue {
TableProcedureInterface proc = null; TableProcedureInterface proc = null;
boolean avail = false; boolean avail = false;
while (!avail) { while (!avail) {
Long procId = queue.poll(); Procedure xProc = queue.poll();
proc = procId != null ? procsMap.remove(procId) : null; proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null;
if (proc == null) break; if (proc == null) break;
switch (proc.getTableOperationType()) { switch (proc.getTableOperationType()) {
case CREATE: case CREATE:
@ -415,7 +420,7 @@ public class TestMasterProcedureQueue {
} }
if (!avail) { if (!avail) {
addFront(proc); addFront(proc);
LOG.debug("yield procId=" + procId); LOG.debug("yield procId=" + proc);
} }
} }
return proc; return proc;