HBASE-17605 Changes

- Moved locks out of MasterProcedureScheduler#Queue. One Queue object is used for each namespace/table, which aren't more than 100. So we don't need complexity arising from all functionalities being in one place. SchemaLocking now owns locks and locking implementaion has been moved to procedure2 package.
- Removed NamespaceQueue because it wasn't being used as Queue (add,peek,poll,etc functions threw UnsupportedOperationException). It's was only used for locks on namespaces. Now that locks have been moved out of Queue class, it's not needed anymore.
- Remoed RegionEvent which was there only for locking on regions. Tables/namespaces used locking from Queue class and regions couldn't (there are no separate proc queue at region level), hence the redundance. Now that locking is separate, we can use the same for regions too.
- Removed QueueInterface class. No declarations, except one implementaion, which makes the point of having an interface moot.
- Removed QueueImpl, which was the only concrete implementation of abstract Queue class. Moved functions to Queue class itself to avoid unnecessary level in inheritance hierarchy.
- Removed ProcedureEventQueue class which was just a wrapper around ArrayDeque class. But we now have ProcedureWaitQueue as 'Type class'.
- Encapsulated table priority related stuff in a single class.
- Removed some unused functions.
Change-Id: I6a60424cb41e280bc111703053aa179d9071ba17
This commit is contained in:
Apekshit Sharma 2017-01-24 14:31:28 -08:00
parent a05abd83ef
commit 826b9436fb
11 changed files with 502 additions and 574 deletions

View File

@ -232,7 +232,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
if (event.isReady()) {
return false;
}
suspendProcedure(event, procedure);
waitProcedure(event.getSuspendedProcedures(), procedure);
return true;
}
}
@ -266,7 +266,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
if (isTraceEnabled) {
LOG.trace("Wake event " + event);
}
waitingCount += popEventWaitingObjects(event);
waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
}
}
wakePollIfNeeded(waitingCount);
@ -275,21 +275,23 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
}
}
protected int popEventWaitingObjects(final ProcedureEvent event) {
return popEventWaitingProcedures(event);
}
protected int popEventWaitingProcedures(final ProcedureEventQueue event) {
int count = 0;
while (event.hasWaitingProcedures()) {
wakeProcedure(event.popWaitingProcedure(false));
count++;
/**
* Wakes up given waiting procedures by pushing them back into scheduler queues.
* @return size of given {@code waitQueue}.
*/
protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) {
int count = waitQueue.size();
// wakeProcedure adds to the front of queue, so we start from last in the
// waitQueue' queue, so that the procedure which was added first goes in the front for
// the scheduler queue.
while (!waitQueue.isEmpty()) {
wakeProcedure(waitQueue.removeLast());
}
return count;
}
protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) {
event.suspendProcedure(procedure);
protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) {
waitQueue.addLast(proc);
}
protected void wakeProcedure(final Procedure procedure) {
@ -308,10 +310,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
}
protected void wakePollIfNeeded(final int waitingCount) {
if (waitingCount > 1) {
schedWaitCond.signalAll();
} else if (waitingCount > 0) {
if (waitingCount <= 0) return;
if (waitingCount == 1) {
schedWaitCond.signal();
} else {
schedWaitCond.signalAll();
}
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
/**
* Locking for mutual exclusion between procedures. Only by procedure framework internally.
* {@link LockAndQueue} has two purposes:
* <ol>
* <li>Acquire/release exclusive/shared locks</li>
* <li>Maintain a list of procedures waiting for this lock<br>
* To do so, {@link LockAndQueue} extends {@link ProcedureDeque} class. Using inheritance over
* composition for this need is unusual, but the choice is motivated by million regions
* assignment case as it will reduce memory footprint and number of objects to be GCed.
* </ol>
*
* NOT thread-safe. Needs external concurrency control. For eg. Uses in MasterProcedureScheduler are
* guarded by schedLock().
* <br>
* There is no need of 'volatile' keyword for member variables because of memory synchronization
* guarantees of locks (see 'Memory Synchronization',
* http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html)
* <br>
* We do not implement Lock interface because we need exclusive + shared locking, and also
* because try-lock functions require procedure id.
* <br>
* We do not use ReentrantReadWriteLock directly because of its high memory overhead.
*/
public class LockAndQueue extends ProcedureDeque implements LockStatus {
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
private int sharedLock = 0;
// ======================================================================
// Lock Status
// ======================================================================
@Override
public boolean isLocked() {
return hasExclusiveLock() || sharedLock > 0;
}
@Override
public boolean hasExclusiveLock() {
return this.exclusiveLockProcIdOwner != Long.MIN_VALUE;
}
@Override
public boolean isLockOwner(long procId) {
return exclusiveLockProcIdOwner == procId;
}
@Override
public boolean hasParentLock(final Procedure proc) {
return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
}
@Override
public boolean hasLockAccess(final Procedure proc) {
return isLockOwner(proc.getProcId()) || hasParentLock(proc);
}
@Override
public long getExclusiveLockProcIdOwner() {
return exclusiveLockProcIdOwner;
}
@Override
public int getSharedLockCount() {
return sharedLock;
}
// ======================================================================
// try/release Shared/Exclusive lock
// ======================================================================
public boolean trySharedLock() {
if (hasExclusiveLock()) return false;
sharedLock++;
return true;
}
public boolean releaseSharedLock() {
return --sharedLock == 0;
}
public boolean tryExclusiveLock(final Procedure proc) {
if (isLocked()) return hasLockAccess(proc);
exclusiveLockProcIdOwner = proc.getProcId();
return true;
}
public boolean releaseExclusiveLock(final Procedure proc) {
if (isLockOwner(proc.getProcId())) {
exclusiveLockProcIdOwner = Long.MIN_VALUE;
return true;
}
return false;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
/**
* Interface to get status of a Lock without getting access to acquire/release lock.
* Currently used in MasterProcedureScheduler where we want to give Queues access to lock's
* status for scheduling purposes, but not the ability to acquire/release it.
*/
public interface LockStatus {
boolean isLocked();
boolean hasExclusiveLock();
boolean isLockOwner(long procId);
boolean hasParentLock(final Procedure proc);
boolean hasLockAccess(final Procedure proc);
long getExclusiveLockProcIdOwner();
int getSharedLockCount();
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.Procedure;
import java.util.ArrayDeque;
/**
* Type class.
* For conceptual purpose only. Seeing ProcedureDeque as type instead of just ArrayDeque gives
* more understanding that it's a queue of waiting procedures.
*/
@InterfaceAudience.Private
public class ProcedureDeque extends ArrayDeque<Procedure> {
}

View File

@ -22,34 +22,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Basic ProcedureEvent that contains an "object", which can be a
* description or a reference to the resource to wait on, and a
* queue for suspended procedures.
* Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
* resource to wait on, and a queue for suspended procedures.
* Access to suspended procedures queue is 'synchronized' on the event itself.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureEvent<T> extends ProcedureEventQueue {
public class ProcedureEvent<T> {
private final T object;
private boolean ready = false;
private ProcedureDeque suspendedProcedures = new ProcedureDeque();
public ProcedureEvent(final T object) {
this.object = object;
}
public T getObject() {
return object;
}
public synchronized boolean isReady() {
return ready;
}
@InterfaceAudience.Private
protected synchronized void setReady(final boolean isReady) {
synchronized void setReady(final boolean isReady) {
this.ready = isReady;
}
public ProcedureDeque getSuspendedProcedures() {
return suspendedProcedures;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(" + object + ")";

View File

@ -1,85 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayDeque;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Basic queue to store suspended procedures.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureEventQueue {
private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class);
private ArrayDeque<Procedure> waitingProcedures = null;
public ProcedureEventQueue() {
}
@InterfaceAudience.Private
public synchronized void suspendProcedure(final Procedure proc) {
if (waitingProcedures == null) {
waitingProcedures = new ArrayDeque<Procedure>();
}
waitingProcedures.addLast(proc);
}
@InterfaceAudience.Private
public synchronized void removeProcedure(final Procedure proc) {
if (waitingProcedures != null) {
waitingProcedures.remove(proc);
}
}
@InterfaceAudience.Private
public synchronized boolean hasWaitingProcedures() {
return waitingProcedures != null;
}
@InterfaceAudience.Private
public synchronized Procedure popWaitingProcedure(final boolean popFront) {
// it will be nice to use IterableList on a procedure and avoid allocations...
Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast();
if (waitingProcedures.isEmpty()) {
waitingProcedures = null;
}
return proc;
}
@VisibleForTesting
public synchronized void clear() {
waitingProcedures = null;
}
@VisibleForTesting
public synchronized int size() {
if (waitingProcedures != null) {
return waitingProcedures.size();
}
return 0;
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.procedure2;
import java.util.ArrayDeque;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -29,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
private final ProcedureDeque runnables = new ProcedureDeque();
@Override
protected void enqueue(final Procedure procedure, final boolean addFront) {

View File

@ -42,6 +42,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Procedure to allow clients and external admin tools to take locks on table/namespace/regions.
* This procedure when scheduled, acquires specified locks, suspends itself and waits for :
* - call to unlock: if lock request came from the process itself, say master chore.
* - Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding
* the lock or not based on last heartbeat timestamp.
*/
@InterfaceAudience.Private
public final class LockProcedure extends Procedure<MasterProcedureEnv>
implements TableProcedureInterface {
@ -71,7 +78,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
// this is for internal working
private boolean hasLock;
private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<LockProcedure>(this);
private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<>(this);
// True if this proc acquired relevant locks. This value is for client checks.
private final AtomicBoolean locked = new AtomicBoolean(false);
// Last system time (in ms) when client sent the heartbeat.

View File

@ -141,14 +141,6 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
}
public void wake(ProcedureEvent event) {
procSched.wakeEvent(event);
}
public void suspend(ProcedureEvent event) {
procSched.suspendEvent(event);
}
public void setEventReady(ProcedureEvent event, boolean isReady) {
if (isReady) {
procSched.wakeEvent(event);

View File

@ -146,7 +146,7 @@ public class TestMasterProcedureEvents {
// check that nothing is in the event queue
LOG.debug("checking " + event);
assertEquals(false, event.isReady());
assertEquals(0, event.size());
assertEquals(0, event.getSuspendedProcedures().size());
// submit the procedure
LOG.debug("submit " + proc);
@ -154,12 +154,12 @@ public class TestMasterProcedureEvents {
// wait until the event is in the queue (proc executed and got into suspended state)
LOG.debug("wait procedure suspended on " + event);
while (event.size() < 1) Thread.sleep(25);
while (event.getSuspendedProcedures().size() < 1) Thread.sleep(25);
// check that the proc is in the event queue
LOG.debug("checking " + event + " size=" + event.size());
LOG.debug("checking " + event + " size=" + event.getSuspendedProcedures().size());
assertEquals(false, event.isReady());
assertEquals(1, event.size());
assertEquals(1, event.getSuspendedProcedures().size());
// wake the event
LOG.debug("wake " + event);
@ -172,7 +172,7 @@ public class TestMasterProcedureEvents {
// check that nothing is in the event queue and the event is not suspended
assertEquals(true, event.isReady());
assertEquals(0, event.size());
assertEquals(0, event.getSuspendedProcedures().size());
LOG.debug("completed execution of " + proc +
" pollCalls=" + (procSched.getPollCalls() - startPollCalls) +
" nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls));