HBASE-14837 Procedure v2 - Procedure Queue Improvement

This commit is contained in:
Matteo Bertozzi 2016-01-14 08:29:10 -08:00
parent b3c5f09ee0
commit 3c2229a9a8
26 changed files with 1575 additions and 1009 deletions

View File

@ -222,10 +222,10 @@ public class ProcedureInfo {
procProto.getOwner(), procProto.getOwner(),
procProto.getState(), procProto.getState(),
procProto.hasParentId() ? procProto.getParentId() : -1, procProto.hasParentId() ? procProto.getParentId() : -1,
procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null, procProto.hasException() ? procProto.getException() : null,
procProto.getLastUpdate(), procProto.getLastUpdate(),
procProto.getStartTime(), procProto.getStartTime(),
procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null); procProto.hasResult() ? procProto.getResult().toByteArray() : null);
} }
/** /**

View File

@ -785,8 +785,7 @@ public class ProcedureExecutor<TEnvironment> {
*/ */
private void execLoop() { private void execLoop() {
while (isRunning()) { while (isRunning()) {
Long procId = runnables.poll(); Procedure proc = runnables.poll();
Procedure proc = procId != null ? procedures.get(procId) : null;
if (proc == null) continue; if (proc == null) continue;
try { try {

View File

@ -1,174 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* This class is a container of queues that allows to select a queue
* in a round robin fashion, considering priority of the queue.
*
* the quantum is just how many poll() will return the same object.
* e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
* e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
* then the object priority is just a priority * quantum
*
* Example:
* - three queues (A, B, C) with priorities (1, 1, 2)
* - The first poll() will return A
* - The second poll() will return B
* - The third and forth poll() will return C
* - and so on again and again.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
private ConcurrentSkipListMap<TKey, TQueue> objMap =
new ConcurrentSkipListMap<TKey, TQueue>();
private final ReentrantLock lock = new ReentrantLock();
private final int quantum;
private Map.Entry<TKey, TQueue> current = null;
private int currentQuantum = 0;
public interface FairObject {
boolean isAvailable();
int getPriority();
}
/**
* @param quantum how many poll() will return the same object.
*/
public ProcedureFairRunQueues(final int quantum) {
this.quantum = quantum;
}
public TQueue get(final TKey key) {
return objMap.get(key);
}
public TQueue add(final TKey key, final TQueue queue) {
TQueue oldq = objMap.putIfAbsent(key, queue);
return oldq != null ? oldq : queue;
}
public TQueue remove(final TKey key) {
TQueue queue = objMap.get(key);
if (queue != null) {
lock.lock();
try {
queue = objMap.remove(key);
if (current != null && queue == current.getValue()) {
currentQuantum = 0;
current = null;
}
} finally {
lock.unlock();
}
}
return queue;
}
public void clear() {
lock.lock();
try {
currentQuantum = 0;
current = null;
objMap.clear();
} finally {
lock.unlock();
}
}
/**
* @return the next available item if present
*/
public TQueue poll() {
lock.lock();
try {
TQueue queue;
if (currentQuantum == 0) {
if (nextObject() == null) {
// nothing here
return null;
}
queue = current.getValue();
currentQuantum = calculateQuantum(queue) - 1;
} else {
currentQuantum--;
queue = current.getValue();
}
if (!queue.isAvailable()) {
Map.Entry<TKey, TQueue> last = current;
// Try the next one
do {
if (nextObject() == null)
return null;
} while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
queue = current.getValue();
currentQuantum = calculateQuantum(queue) - 1;
}
return queue;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append('{');
for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
builder.append(entry.getKey());
builder.append(':');
builder.append(entry.getValue());
}
builder.append('}');
return builder.toString();
}
private Map.Entry<TKey, TQueue> nextObject() {
Map.Entry<TKey, TQueue> next = null;
// If we have already a key, try the next one
if (current != null) {
next = objMap.higherEntry(current.getKey());
}
// if there is no higher key, go back to the first
current = (next != null) ? next : objMap.firstEntry();
return current;
}
private int calculateQuantum(final TQueue fairObject) {
// TODO
return Math.max(1, fairObject.getPriority() * quantum);
}
}

View File

@ -55,9 +55,9 @@ public interface ProcedureRunnableSet {
/** /**
* Fetch one Procedure from the queue * Fetch one Procedure from the queue
* @return the Procedure ID to execute, or null if nothing present. * @return the Procedure to execute, or null if nothing present.
*/ */
Long poll(); Procedure poll();
/** /**
* In case the class is blocking on poll() waiting for items to be added, * In case the class is blocking on poll() waiting for items to be added,

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
private final Deque<Long> runnables = new ArrayDeque<Long>(); private final Deque<Procedure> runnables = new ArrayDeque<Procedure>();
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition(); private final Condition waitCond = lock.newCondition();
@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
public void addFront(final Procedure proc) { public void addFront(final Procedure proc) {
lock.lock(); lock.lock();
try { try {
runnables.addFirst(proc.getProcId()); runnables.addFirst(proc);
waitCond.signal(); waitCond.signal();
} finally { } finally {
lock.unlock(); lock.unlock();
@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
public void addBack(final Procedure proc) { public void addBack(final Procedure proc) {
lock.lock(); lock.lock();
try { try {
runnables.addLast(proc.getProcId()); runnables.addLast(proc);
waitCond.signal(); waitCond.signal();
} finally { } finally {
lock.unlock(); lock.unlock();
@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
@Override @Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Long poll() { public Procedure poll() {
lock.lock(); lock.lock();
try { try {
if (runnables.isEmpty()) { if (runnables.isEmpty()) {

View File

@ -1,154 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
@Category(SmallTests.class)
public class TestProcedureFairRunQueues {
private static class TestRunQueue implements ProcedureFairRunQueues.FairObject {
private final int priority;
private final String name;
private boolean available = true;
public TestRunQueue(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public String toString() {
return name;
}
private void setAvailable(boolean available) {
this.available = available;
}
@Override
public boolean isAvailable() {
return available;
}
@Override
public int getPriority() {
return priority;
}
}
@Test
public void testEmptyFairQueues() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
for (int i = 0; i < 3; ++i) {
assertEquals(null, fairq.poll());
}
}
@Test
public void testFairQueues() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
for (int i = 0; i < 3; ++i) {
assertEquals(a, fairq.poll());
assertEquals(b, fairq.poll());
assertEquals(m, fairq.poll());
assertEquals(m, fairq.poll());
}
}
@Test
public void testFairQueuesNotAvailable() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
// m is not available
m.setAvailable(false);
for (int i = 0; i < 3; ++i) {
assertEquals(a, fairq.poll());
assertEquals(b, fairq.poll());
}
// m is available
m.setAvailable(true);
for (int i = 0; i < 3; ++i) {
assertEquals(m, fairq.poll());
assertEquals(m, fairq.poll());
assertEquals(a, fairq.poll());
assertEquals(b, fairq.poll());
}
// b is not available
b.setAvailable(false);
for (int i = 0; i < 3; ++i) {
assertEquals(m, fairq.poll());
assertEquals(m, fairq.poll());
assertEquals(a, fairq.poll());
}
assertEquals(m, fairq.poll());
m.setAvailable(false);
// m should be fetched next, but is no longer available
assertEquals(a, fairq.poll());
assertEquals(a, fairq.poll());
b.setAvailable(true);
for (int i = 0; i < 3; ++i) {
assertEquals(b, fairq.poll());
assertEquals(a, fairq.poll());
}
}
@Test
public void testFairQueuesDelete() throws Exception {
ProcedureFairRunQueues<String, TestRunQueue> fairq
= new ProcedureFairRunQueues<String, TestRunQueue>(1);
TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
// Fetch A and then remove it
assertEquals(a, fairq.poll());
assertEquals(a, fairq.remove("A"));
// Fetch B and then remove it
assertEquals(b, fairq.poll());
assertEquals(b, fairq.remove("B"));
// Fetch M and then remove it
assertEquals(m, fairq.poll());
assertEquals(m, fairq.remove("M"));
// nothing left
assertEquals(null, fairq.poll());
}
}

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
@ -280,14 +281,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// flag set after we complete initialization once active, // flag set after we complete initialization once active,
// it is not private since it's used in unit tests // it is not private since it's used in unit tests
volatile boolean initialized = false; private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
// flag set after master services are started, // flag set after master services are started,
// initialization may have not completed yet. // initialization may have not completed yet.
volatile boolean serviceStarted = false; volatile boolean serviceStarted = false;
// flag set after we complete assignMeta. // flag set after we complete assignMeta.
private volatile boolean serverCrashProcessingEnabled = false; private final ProcedureEvent serverCrashProcessingEnabled =
new ProcedureEvent("server crash processing");
LoadBalancer balancer; LoadBalancer balancer;
private RegionNormalizer normalizer; private RegionNormalizer normalizer;
@ -783,7 +785,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.balancer);
// Set master as 'initialized'. // Set master as 'initialized'.
initialized = true; setInitialized(true);
status.setStatus("Starting quota manager"); status.setStatus("Starting quota manager");
initQuotaManager(); initQuotaManager();
@ -1002,8 +1004,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// servers. This is required so that if meta is assigning to a server which dies after // servers. This is required so that if meta is assigning to a server which dies after
// assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
// stuck here waiting forever if waitForMeta is specified. // stuck here waiting forever if waitForMeta is specified.
if (!serverCrashProcessingEnabled) { if (!isServerCrashProcessingEnabled()) {
serverCrashProcessingEnabled = true; setServerCrashProcessingEnabled(true);
this.serverManager.processQueuedDeadServers(); this.serverManager.processQueuedDeadServers();
} }
@ -1240,7 +1242,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
public boolean balance(boolean force) throws IOException { public boolean balance(boolean force) throws IOException {
// if master not initialized, don't run balancer. // if master not initialized, don't run balancer.
if (!this.initialized) { if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run balancer."); LOG.debug("Master has not been initialized, don't run balancer.");
return false; return false;
} }
@ -1337,7 +1339,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
* @throws CoordinatedStateException * @throws CoordinatedStateException
*/ */
public boolean normalizeRegions() throws IOException, CoordinatedStateException { public boolean normalizeRegions() throws IOException, CoordinatedStateException {
if (!this.initialized) { if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run region normalizer."); LOG.debug("Master has not been initialized, don't run region normalizer.");
return false; return false;
} }
@ -2301,7 +2303,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException { void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
checkServiceStarted(); checkServiceStarted();
if (!this.initialized) { if (!isInitialized()) {
throw new PleaseHoldException("Master is initializing"); throw new PleaseHoldException("Master is initializing");
} }
} }
@ -2336,6 +2338,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
*/ */
@Override @Override
public boolean isInitialized() { public boolean isInitialized() {
return initialized.isReady();
}
@VisibleForTesting
public void setInitialized(boolean isInitialized) {
procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
}
public ProcedureEvent getInitializedEvent() {
return initialized; return initialized;
} }
@ -2346,12 +2357,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
*/ */
@Override @Override
public boolean isServerCrashProcessingEnabled() { public boolean isServerCrashProcessingEnabled() {
return this.serverCrashProcessingEnabled; return serverCrashProcessingEnabled.isReady();
} }
@VisibleForTesting @VisibleForTesting
public void setServerCrashProcessingEnabled(final boolean b) { public void setServerCrashProcessingEnabled(final boolean b) {
this.serverCrashProcessingEnabled = b; procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
}
public ProcedureEvent getServerCrashProcessingEnabledEvent() {
return serverCrashProcessingEnabled;
} }
/** /**

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family");
tableName,
EventType.C_M_ADD_FAMILY.toString());
} }
@Override @Override

View File

@ -205,7 +205,9 @@ public class CreateNamespaceProcedure
return true; return true;
} }
return false; if (env.waitInitialized(this)) {
return false;
}
} }
return getTableNamespaceManager(env).acquireExclusiveLock(); return getTableNamespaceManager(env).acquireExclusiveLock();
} }

View File

@ -270,7 +270,7 @@ public class CreateTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized() && !getTableName().isSystemTable()) { if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
return false; return false;
} }
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table"); return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table");

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -201,10 +200,8 @@ public class DeleteColumnFamilyProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family");
tableName,
EventType.C_M_DELETE_FAMILY.toString());
} }
@Override @Override

View File

@ -198,7 +198,7 @@ public class DeleteTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table"); return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table");
} }

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.BulkAssigner;
@ -215,10 +214,8 @@ public class DisableTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table");
tableName,
EventType.C_M_DISABLE_TABLE.toString());
} }
@Override @Override

View File

@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.GeneralBulkAssigner; import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
@ -239,10 +239,8 @@ public class EnableTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table");
tableName,
EventType.C_M_ENABLE_TABLE.toString());
} }
@Override @Override

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -85,12 +87,12 @@ public class MasterProcedureEnv {
} }
} }
private final MasterProcedureQueue procQueue; private final MasterProcedureScheduler procSched;
private final MasterServices master; private final MasterServices master;
public MasterProcedureEnv(final MasterServices master) { public MasterProcedureEnv(final MasterServices master) {
this.master = master; this.master = master;
this.procQueue = new MasterProcedureQueue(master.getConfiguration(), this.procSched = new MasterProcedureScheduler(master.getConfiguration(),
master.getTableLockManager()); master.getTableLockManager());
} }
@ -114,8 +116,8 @@ public class MasterProcedureEnv {
return master.getMasterCoprocessorHost(); return master.getMasterCoprocessorHost();
} }
public MasterProcedureQueue getProcedureQueue() { public MasterProcedureScheduler getProcedureQueue() {
return procQueue; return procSched;
} }
public boolean isRunning() { public boolean isRunning() {
@ -125,4 +127,28 @@ public class MasterProcedureEnv {
public boolean isInitialized() { public boolean isInitialized() {
return master.isInitialized(); return master.isInitialized();
} }
public boolean waitInitialized(Procedure proc) {
return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc);
}
public boolean waitServerCrashProcessingEnabled(Procedure proc) {
return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
}
public void wake(ProcedureEvent event) {
procSched.wake(event);
}
public void suspend(ProcedureEvent event) {
procSched.suspend(event);
}
public void setEventReady(ProcedureEvent event, boolean isReady) {
if (isReady) {
procSched.wake(event);
} else {
procSched.suspend(event);
}
}
} }

View File

@ -1,578 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues;
import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
/**
* ProcedureRunnableSet for the Master Procedures.
* This RunnableSet tries to provide to the ProcedureExecutor procedures
* that can be executed without having to wait on a lock.
* Most of the master operations can be executed concurrently, if they
* are operating on different tables (e.g. two create table can be performed
* at the same, time assuming table A and table B) or against two different servers; say
* two servers that crashed at about the same time.
*
* <p>Each procedure should implement an interface providing information for this queue.
* for example table related procedures should implement TableProcedureInterface.
* each procedure will be pushed in its own queue, and based on the operation type
* we may take smarter decision. e.g. we can abort all the operations preceding
* a delete table, or similar.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MasterProcedureQueue implements ProcedureRunnableSet {
private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
// Two queues to ensure that server procedures run ahead of table precedures always.
private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ;
/**
* Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the
* server that was carrying meta should rise to the top of the queue (this is how it used to
* work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
* that were carrying system tables on crash; do I need to have these servers have priority?
*
* <p>Apart from the special-casing of meta and system tables, fairq is what we want
*/
private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final TableLockManager lockManager;
private final int metaTablePriority;
private final int userTablePriority;
private final int sysTablePriority;
private static final int DEFAULT_SERVER_PRIORITY = 1;
/**
* Keeps count across server and table queues.
*/
private int queueSize;
public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1);
this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1);
this.lockManager = lockManager;
// TODO: should this be part of the HTD?
metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
}
@Override
public void addFront(final Procedure proc) {
lock.lock();
try {
getRunQueueOrCreate(proc).addFront(proc);
queueSize++;
waitCond.signal();
} finally {
lock.unlock();
}
}
@Override
public void addBack(final Procedure proc) {
lock.lock();
try {
getRunQueueOrCreate(proc).addBack(proc);
queueSize++;
waitCond.signal();
} finally {
lock.unlock();
}
}
@Override
public void yield(final Procedure proc) {
addBack(proc);
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Long poll() {
Long pollResult = null;
lock.lock();
try {
if (queueSize == 0) {
waitCond.await();
if (queueSize == 0) {
return null;
}
}
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
pollResult = doPoll(serverFairQ.poll());
if (pollResult == null) {
pollResult = doPoll(tableFairQ.poll());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
return pollResult;
}
private Long doPoll(final RunQueue rq) {
if (rq == null || !rq.isAvailable()) return null;
this.queueSize--;
return rq.poll();
}
@Override
public void signalAll() {
lock.lock();
try {
waitCond.signalAll();
} finally {
lock.unlock();
}
}
@Override
public void clear() {
lock.lock();
try {
serverFairQ.clear();
tableFairQ.clear();
queueSize = 0;
} finally {
lock.unlock();
}
}
@Override
public int size() {
lock.lock();
try {
return queueSize;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
lock.lock();
try {
return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ +
", serverFairQ: " + serverFairQ;
} finally {
lock.unlock();
}
}
@Override
public void completionCleanup(Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
boolean tableDeleted;
if (proc.hasException()) {
IOException procEx = proc.getException().unwrapRemoteException();
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
// create failed because the table already exist
tableDeleted = !(procEx instanceof TableExistsException);
} else {
// the operation failed because the table does not exist
tableDeleted = (procEx instanceof TableNotFoundException);
}
} else {
// the table was deleted
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
}
if (tableDeleted) {
markTableAsDeleted(iProcTable.getTableName());
}
}
// No cleanup for ServerProcedureInterface types, yet.
}
private RunQueue getRunQueueOrCreate(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
final TableName table = ((TableProcedureInterface)proc).getTableName();
return getRunQueueOrCreate(table);
}
if (proc instanceof ServerProcedureInterface) {
return getRunQueueOrCreate((ServerProcedureInterface)proc);
}
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
}
private TableRunQueue getRunQueueOrCreate(final TableName table) {
final TableRunQueue queue = getRunQueue(table);
if (queue != null) return queue;
return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table));
}
private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) {
final ServerRunQueue queue = getRunQueue(spi.getServerName());
if (queue != null) return queue;
return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi));
}
private TableRunQueue createTableRunQueue(final TableName table) {
int priority = userTablePriority;
if (table.equals(TableName.META_TABLE_NAME)) {
priority = metaTablePriority;
} else if (table.isSystemTable()) {
priority = sysTablePriority;
}
return new TableRunQueue(priority);
}
private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) {
return new ServerRunQueue(DEFAULT_SERVER_PRIORITY);
}
private TableRunQueue getRunQueue(final TableName table) {
return (TableRunQueue)tableFairQ.get(table);
}
private ServerRunQueue getRunQueue(final ServerName sn) {
return (ServerRunQueue)serverFairQ.get(sn);
}
/**
* Try to acquire the write lock on the specified table.
* other operations in the table-queue will be executed after the lock is released.
* @param table Table to lock
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose);
}
/**
* Release the write lock taken with tryAcquireTableWrite()
* @param table the name of the table that has the write lock
*/
public void releaseTableExclusiveLock(final TableName table) {
getRunQueue(table).releaseExclusiveLock(lockManager, table);
}
/**
* Try to acquire the read lock on the specified table.
* other read operations in the table-queue may be executed concurrently,
* otherwise they have to wait until all the read-locks are released.
* @param table Table to lock
* @param purpose Human readable reason for locking the table
* @return true if we were able to acquire the lock on the table, otherwise false.
*/
public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose);
}
/**
* Release the read lock taken with tryAcquireTableRead()
* @param table the name of the table that has the read lock
*/
public void releaseTableSharedLock(final TableName table) {
getRunQueue(table).releaseSharedLock(lockManager, table);
}
/**
* Try to acquire the write lock on the specified server.
* @see #releaseServerExclusiveLock(ServerProcedureInterface)
* @param spi Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) {
return getRunQueueOrCreate(spi).tryExclusiveLock();
}
/**
* Release the write lock
* @see #tryAcquireServerExclusiveLock(ServerProcedureInterface)
* @param spi the server that has the write lock
*/
public void releaseServerExclusiveLock(final ServerProcedureInterface spi) {
getRunQueue(spi.getServerName()).releaseExclusiveLock();
}
/**
* Try to acquire the read lock on the specified server.
* @see #releaseServerSharedLock(ServerProcedureInterface)
* @param spi Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) {
return getRunQueueOrCreate(spi).trySharedLock();
}
/**
* Release the read lock taken
* @see #tryAcquireServerSharedLock(ServerProcedureInterface)
* @param spi the server that has the read lock
*/
public void releaseServerSharedLock(final ServerProcedureInterface spi) {
getRunQueue(spi.getServerName()).releaseSharedLock();
}
/**
* Tries to remove the queue and the table-lock of the specified table.
* If there are new operations pending (e.g. a new create),
* the remove will not be performed.
* @param table the name of the table that should be marked as deleted
* @return true if deletion succeeded, false otherwise meaning that there are
* other new operations pending for that table (e.g. a new create).
*/
protected boolean markTableAsDeleted(final TableName table) {
TableRunQueue queue = getRunQueue(table);
if (queue != null) {
lock.lock();
try {
if (queue.isEmpty() && queue.acquireDeleteLock()) {
tableFairQ.remove(table);
// Remove the table lock
try {
lockManager.tableDeleted(table);
} catch (IOException e) {
LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
}
} else {
// TODO: If there are no create, we can drop all the other ops
return false;
}
} finally {
lock.unlock();
}
}
return true;
}
private interface RunQueue extends ProcedureFairRunQueues.FairObject {
void addFront(Procedure proc);
void addBack(Procedure proc);
Long poll();
boolean acquireDeleteLock();
}
/**
* Base abstract class for RunQueue implementations.
* Be careful honoring synchronizations in subclasses. In here we protect access but if you are
* acting on a state found in here, be sure dependent code keeps synchronization.
* Implements basic in-memory read/write locking mechanism to prevent procedure steps being run
* in parallel.
*/
private static abstract class AbstractRunQueue implements RunQueue {
// All modification of runnables happens with #lock held.
private final Deque<Long> runnables = new ArrayDeque<Long>();
private final int priority;
private boolean exclusiveLock = false;
private int sharedLock = 0;
public AbstractRunQueue(int priority) {
this.priority = priority;
}
boolean isEmpty() {
return this.runnables.isEmpty();
}
@Override
public boolean isAvailable() {
synchronized (this) {
return !exclusiveLock && !runnables.isEmpty();
}
}
@Override
public int getPriority() {
return this.priority;
}
@Override
public void addFront(Procedure proc) {
this.runnables.addFirst(proc.getProcId());
}
@Override
public void addBack(Procedure proc) {
this.runnables.addLast(proc.getProcId());
}
@Override
public Long poll() {
return this.runnables.poll();
}
@Override
public synchronized boolean acquireDeleteLock() {
return tryExclusiveLock();
}
public synchronized boolean isLocked() {
return isExclusiveLock() || sharedLock > 0;
}
public synchronized boolean isExclusiveLock() {
return this.exclusiveLock;
}
public synchronized boolean trySharedLock() {
if (isExclusiveLock()) return false;
sharedLock++;
return true;
}
public synchronized void releaseSharedLock() {
sharedLock--;
}
/**
* @return True if only one instance of a shared lock outstanding.
*/
synchronized boolean isSingleSharedLock() {
return sharedLock == 1;
}
public synchronized boolean tryExclusiveLock() {
if (isLocked()) return false;
exclusiveLock = true;
return true;
}
public synchronized void releaseExclusiveLock() {
exclusiveLock = false;
}
@Override
public String toString() {
return this.runnables.toString();
}
}
/**
* Run Queue for Server procedures.
*/
private static class ServerRunQueue extends AbstractRunQueue {
public ServerRunQueue(int priority) {
super(priority);
}
}
/**
* Run Queue for a Table. It contains a read-write lock that is used by the
* MasterProcedureQueue to decide if we should fetch an item from this queue
* or skip to another one which will be able to run without waiting for locks.
*/
private static class TableRunQueue extends AbstractRunQueue {
private TableLock tableLock = null;
public TableRunQueue(int priority) {
super(priority);
}
// TODO: Improve run-queue push with TableProcedureInterface.getType()
// we can take smart decisions based on the type of the operation (e.g. create/delete)
@Override
public void addBack(final Procedure proc) {
super.addBack(proc);
}
public synchronized boolean trySharedLock(final TableLockManager lockManager,
final TableName tableName, final String purpose) {
if (isExclusiveLock()) return false;
// Take zk-read-lock
tableLock = lockManager.readLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire read lock on " + tableName, e);
tableLock = null;
return false;
}
trySharedLock();
return true;
}
public synchronized void releaseSharedLock(final TableLockManager lockManager,
final TableName tableName) {
releaseTableLock(lockManager, isSingleSharedLock());
releaseSharedLock();
}
public synchronized boolean tryExclusiveLock(final TableLockManager lockManager,
final TableName tableName, final String purpose) {
if (isLocked()) return false;
// Take zk-write-lock
tableLock = lockManager.writeLock(tableName, purpose);
try {
tableLock.acquire();
} catch (IOException e) {
LOG.error("failed acquire write lock on " + tableName, e);
tableLock = null;
return false;
}
tryExclusiveLock();
return true;
}
public synchronized void releaseExclusiveLock(final TableLockManager lockManager,
final TableName tableName) {
releaseTableLock(lockManager, true);
releaseExclusiveLock();
}
private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
for (int i = 0; i < 3; ++i) {
try {
tableLock.release();
if (reset) {
tableLock = null;
}
break;
} catch (IOException e) {
LOG.warn("Could not release the table write-lock", e);
}
}
}
}
}

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family");
tableName,
EventType.C_M_MODIFY_FAMILY.toString());
} }
@Override @Override

View File

@ -215,10 +215,8 @@ public class ModifyTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock( return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table");
getTableName(),
EventType.C_M_MODIFY_TABLE.toString());
} }
@Override @Override

View File

@ -588,13 +588,13 @@ implements ServerProcedureInterface {
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false; if (env.waitServerCrashProcessingEnabled(this)) return false;
return env.getProcedureQueue().tryAcquireServerExclusiveLock(this); return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName());
} }
@Override @Override
protected void releaseLock(final MasterProcedureEnv env) { protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseServerExclusiveLock(this); env.getProcedureQueue().releaseServerExclusiveLock(getServerName());
} }
@Override @Override
@ -788,6 +788,11 @@ implements ServerProcedureInterface {
return this.carryingMeta; return this.carryingMeta;
} }
@Override
public ServerOperationType getServerOperationType() {
return ServerOperationType.CRASH_HANDLER;
}
/** /**
* For this procedure, yield at end of each successful flow step so that all crashed servers * For this procedure, yield at end of each successful flow step so that all crashed servers
* can make progress rather than do the default which has each procedure running to completion * can make progress rather than do the default which has each procedure running to completion

View File

@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface ServerProcedureInterface { public interface ServerProcedureInterface {
public enum ServerOperationType {
CRASH_HANDLER
};
/** /**
* @return Name of this server instance. * @return Name of this server instance.
*/ */
@ -37,4 +41,12 @@ public interface ServerProcedureInterface {
* @return True if this server has an hbase:meta table region. * @return True if this server has an hbase:meta table region.
*/ */
boolean hasMetaTableRegion(); boolean hasMetaTableRegion();
/**
* Given an operation type we can take decisions about what to do with pending operations.
* e.g. if we get a crash handler and we have some assignment operation pending
* we can abort those operations.
* @return the operation type that the procedure is executing.
*/
ServerOperationType getServerOperationType();
} }

View File

@ -181,7 +181,7 @@ public class TruncateTableProcedure
@Override @Override
protected boolean acquireLock(final MasterProcedureEnv env) { protected boolean acquireLock(final MasterProcedureEnv env) {
if (!env.isInitialized()) return false; if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table"); return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
} }

View File

@ -129,14 +129,14 @@ public class TestMaster {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster(); HMaster m = cluster.getMaster();
try { try {
m.initialized = false; // fake it, set back later m.setInitialized(false); // fake it, set back later
HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO; HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO;
m.move(meta.getEncodedNameAsBytes(), null); m.move(meta.getEncodedNameAsBytes(), null);
fail("Region should not be moved since master is not initialized"); fail("Region should not be moved since master is not initialized");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(ioe instanceof PleaseHoldException); assertTrue(ioe instanceof PleaseHoldException);
} finally { } finally {
m.initialized = true; m.setInitialized(true);
} }
} }
@ -173,13 +173,13 @@ public class TestMaster {
try { try {
List<HRegionInfo> tableRegions = admin.getTableRegions(tableName); List<HRegionInfo> tableRegions = admin.getTableRegions(tableName);
master.initialized = false; // fake it, set back later master.setInitialized(false); // fake it, set back later
admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null); admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null);
fail("Region should not be moved since master is not initialized"); fail("Region should not be moved since master is not initialized");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException")); assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException"));
} finally { } finally {
master.initialized = true; master.setInitialized(true);
TEST_UTIL.deleteTable(tableName); TEST_UTIL.deleteTable(tableName);
} }
} }

View File

@ -308,7 +308,7 @@ public class TestMasterNoCluster {
try { try {
// Wait till master is initialized. // Wait till master is initialized.
while (!master.initialized) Threads.sleep(10); while (!master.isInitialized()) Threads.sleep(10);
LOG.info("Master is initialized"); LOG.info("Master is initialized");
assertFalse("The dead server should not be pulled in", assertFalse("The dead server should not be pulled in",

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, MediumTests.class})
public class TestMasterProcedureEvents {
private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static long nonceGroup = HConstants.NO_NONCE;
private static long nonce = HConstants.NO_NONCE;
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
}
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(3);
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Test
public void testMasterInitializedEvent() throws Exception {
TableName tableName = TableName.valueOf("testMasterInitializedEvent");
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
HRegionInfo hri = new HRegionInfo(tableName);
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("f");
htd.addFamily(hcd);
while (!master.isInitialized()) Thread.sleep(250);
master.setInitialized(false); // fake it, set back later
CreateTableProcedure proc = new CreateTableProcedure(
procExec.getEnvironment(), htd, new HRegionInfo[] { hri });
long pollCalls = procSched.getPollCalls();
long nullPollCalls = procSched.getNullPollCalls();
long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
for (int i = 0; i < 10; ++i) {
Thread.sleep(100);
assertEquals(pollCalls + 1, procSched.getPollCalls());
assertEquals(nullPollCalls, procSched.getNullPollCalls());
}
master.setInitialized(true);
ProcedureTestingUtility.waitProcedure(procExec, procId);
assertEquals(pollCalls + 2, procSched.getPollCalls());
assertEquals(nullPollCalls, procSched.getNullPollCalls());
}
@Test
public void testServerCrashProcedureEvent() throws Exception {
TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb");
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() ||
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Thread.sleep(25);
}
UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]);
try (Table t = UTIL.getConnection().getTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
}
master.setServerCrashProcessingEnabled(false); // fake it, set back later
long pollCalls = procSched.getPollCalls();
long nullPollCalls = procSched.getNullPollCalls();
// Kill a server. Master will notice but do nothing other than add it to list of dead servers.
HRegionServer hrs = getServerWithRegions();
boolean carryingMeta = master.getAssignmentManager()
.isCarryingMeta(hrs.getServerName()) == AssignmentManager.ServerHostRegion.HOSTING_REGION;
UTIL.getHBaseCluster().killRegionServer(hrs.getServerName());
hrs.join();
// Wait until the expiration of the server has arrived at the master. We won't process it
// by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
// here so ServerManager gets notice and adds expired server to appropriate queues.
while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
long procId = procExec.submitProcedure(
new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta));
for (int i = 0; i < 10; ++i) {
Thread.sleep(100);
assertEquals(pollCalls + 1, procSched.getPollCalls());
assertEquals(nullPollCalls, procSched.getNullPollCalls());
}
// Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
master.setServerCrashProcessingEnabled(true);
ProcedureTestingUtility.waitProcedure(procExec, procId);
LOG.debug("server crash processing poll calls: " + procSched.getPollCalls());
assertTrue(procSched.getPollCalls() >= (pollCalls + 2));
assertEquals(nullPollCalls, procSched.getNullPollCalls());
UTIL.deleteTable(tableName);
}
private HRegionServer getServerWithRegions() {
for (int i = 0; i < 3; ++i) {
HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
if (hrs.getNumberOfOnlineRegions() > 0) {
return hrs;
}
}
return null;
}
}

View File

@ -18,10 +18,6 @@
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -39,23 +35,29 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category(SmallTests.class) import static org.junit.Assert.assertEquals;
public class TestMasterProcedureQueue { import static org.junit.Assert.assertFalse;
private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class); import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
private MasterProcedureQueue queue; @Category({MasterTests.class, SmallTests.class})
public class TestMasterProcedureScheduler {
private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
private MasterProcedureScheduler queue;
private Configuration conf; private Configuration conf;
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
conf = HBaseConfiguration.create(); conf = HBaseConfiguration.create();
queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager()); queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
} }
@After @After
@ -65,7 +67,7 @@ public class TestMasterProcedureQueue {
@Test @Test
public void testConcurrentCreateDelete() throws Exception { public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureQueue procQueue = queue; final MasterProcedureScheduler procQueue = queue;
final TableName table = TableName.valueOf("testtb"); final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false); final AtomicBoolean failure = new AtomicBoolean(false);
@ -135,9 +137,14 @@ public class TestMasterProcedureQueue {
for (int j = 1; j <= NUM_ITEMS; ++j) { for (int j = 1; j <= NUM_ITEMS; ++j) {
for (int i = 1; i <= NUM_TABLES; ++i) { for (int i = 1; i <= NUM_TABLES; ++i) {
Long procId = queue.poll(); Procedure proc = queue.poll();
assertTrue(proc != null);
TableName tableName = ((TestTableProcedure)proc).getTableName();
queue.tryAcquireTableExclusiveLock(tableName, "test");
queue.releaseTableExclusiveLock(tableName);
queue.completionCleanup(proc);
assertEquals(--count, queue.size()); assertEquals(--count, queue.size());
assertEquals(i * 1000 + j, procId.longValue()); assertEquals(i * 1000 + j, proc.getProcId());
} }
} }
assertEquals(0, queue.size()); assertEquals(0, queue.size());
@ -164,7 +171,7 @@ public class TestMasterProcedureQueue {
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName));
// fetch item and take a lock // fetch item and take a lock
assertEquals(1, queue.poll().longValue()); assertEquals(1, queue.poll().getProcId());
// take the xlock // take the xlock
assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write")); assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
// table can't be deleted because we have the lock // table can't be deleted because we have the lock
@ -195,7 +202,7 @@ public class TestMasterProcedureQueue {
for (int i = 1; i <= nitems; ++i) { for (int i = 1; i <= nitems; ++i) {
// fetch item and take a lock // fetch item and take a lock
assertEquals(i, queue.poll().longValue()); assertEquals(i, queue.poll().getProcId());
// take the rlock // take the rlock
assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i)); assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
// table can't be deleted because we have locks and/or items in the queue // table can't be deleted because we have locks and/or items in the queue
@ -233,24 +240,24 @@ public class TestMasterProcedureQueue {
TableProcedureInterface.TableOperationType.READ)); TableProcedureInterface.TableOperationType.READ));
// Fetch the 1st item and take the write lock // Fetch the 1st item and take the write lock
Long procId = queue.poll(); long procId = queue.poll().getProcId();
assertEquals(1, procId.longValue()); assertEquals(1, procId);
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// Fetch the 2nd item and verify that the lock can't be acquired // Fetch the 2nd item and verify that the lock can't be acquired
assertEquals(null, queue.poll()); assertEquals(null, queue.poll(0));
// Release the write lock and acquire the read lock // Release the write lock and acquire the read lock
queue.releaseTableExclusiveLock(tableName); queue.releaseTableExclusiveLock(tableName);
// Fetch the 2nd item and take the read lock // Fetch the 2nd item and take the read lock
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(2, procId.longValue()); assertEquals(2, procId);
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Fetch the 3rd item and verify that the lock can't be acquired // Fetch the 3rd item and verify that the lock can't be acquired
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(3, procId.longValue()); assertEquals(3, procId);
assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// release the rdlock of item 2 and take the wrlock for the 3d item // release the rdlock of item 2 and take the wrlock for the 3d item
@ -258,19 +265,19 @@ public class TestMasterProcedureQueue {
assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
// Fetch 4th item and verify that the lock can't be acquired // Fetch 4th item and verify that the lock can't be acquired
assertEquals(null, queue.poll()); assertEquals(null, queue.poll(0));
// Release the write lock and acquire the read lock // Release the write lock and acquire the read lock
queue.releaseTableExclusiveLock(tableName); queue.releaseTableExclusiveLock(tableName);
// Fetch the 4th item and take the read lock // Fetch the 4th item and take the read lock
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(4, procId.longValue()); assertEquals(4, procId);
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Fetch the 4th item and take the read lock // Fetch the 4th item and take the read lock
procId = queue.poll(); procId = queue.poll().getProcId();
assertEquals(5, procId.longValue()); assertEquals(5, procId);
assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
// Release 4th and 5th read-lock // Release 4th and 5th read-lock
@ -370,11 +377,11 @@ public class TestMasterProcedureQueue {
} }
public static class TestTableProcSet { public static class TestTableProcSet {
private final MasterProcedureQueue queue; private final MasterProcedureScheduler queue;
private Map<Long, TableProcedureInterface> procsMap = private Map<Long, TableProcedureInterface> procsMap =
new ConcurrentHashMap<Long, TableProcedureInterface>(); new ConcurrentHashMap<Long, TableProcedureInterface>();
public TestTableProcSet(final MasterProcedureQueue queue) { public TestTableProcSet(final MasterProcedureScheduler queue) {
this.queue = queue; this.queue = queue;
} }
@ -394,8 +401,8 @@ public class TestMasterProcedureQueue {
TableProcedureInterface proc = null; TableProcedureInterface proc = null;
boolean avail = false; boolean avail = false;
while (!avail) { while (!avail) {
Long procId = queue.poll(); Procedure xProc = queue.poll();
proc = procId != null ? procsMap.remove(procId) : null; proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null;
if (proc == null) break; if (proc == null) break;
switch (proc.getTableOperationType()) { switch (proc.getTableOperationType()) {
case CREATE: case CREATE:
@ -411,7 +418,7 @@ public class TestMasterProcedureQueue {
} }
if (!avail) { if (!avail) {
addFront(proc); addFront(proc);
LOG.debug("yield procId=" + procId); LOG.debug("yield procId=" + proc);
} }
} }
return proc; return proc;