HBASE-16642 Use DelayQueue instead of TimeoutBlockingQueue

Signed-off-by: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
This commit is contained in:
Hiroshi Ikeda 2016-10-13 20:21:46 -07:00 committed by Matteo Bertozzi
parent 91a7bbd581
commit 9a94dc90b4
3 changed files with 68 additions and 420 deletions

View File

@ -29,12 +29,15 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@ -48,8 +51,6 @@ import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -96,17 +97,58 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
* Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
* Used by the DelayQueue to get the timeout interval of the procedure
*/
private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
@Override
public long getTimeout(Procedure proc) {
return proc.getTimeRemaining();
private static class DelayedContainer implements Delayed {
static final DelayedContainer POISON = new DelayedContainer();
/** null if poison */
final Procedure proc;
final long timeoutTime;
DelayedContainer(Procedure proc) {
assert proc != null;
this.proc = proc;
this.timeoutTime = proc.getLastUpdate() + proc.getTimeout();
}
DelayedContainer() {
this.proc = null;
this.timeoutTime = Long.MIN_VALUE;
}
@Override
public TimeUnit getTimeUnit(Procedure proc) {
return TimeUnit.MILLISECONDS;
public long getDelay(TimeUnit unit) {
long currentTime = EnvironmentEdgeManager.currentTime();
if (currentTime >= timeoutTime) {
return 0;
}
return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS);
}
/**
* @throws NullPointerException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (! (obj instanceof DelayedContainer)) {
return false;
}
return Objects.equals(proc, ((DelayedContainer)obj).proc);
}
@Override
public int hashCode() {
return proc != null ? proc.hashCode() : 0;
}
}
@ -239,8 +281,8 @@ public class ProcedureExecutor<TEnvironment> {
* Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
* or periodic procedures.
*/
private final TimeoutBlockingQueue<Procedure> waitingTimeout =
new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
private final DelayQueue<DelayedContainer> waitingTimeout =
new DelayQueue<DelayedContainer>();
/**
* Scheduler/Queue that contains runnable procedures.
@ -544,7 +586,7 @@ public class ProcedureExecutor<TEnvironment> {
LOG.info("Stopping the procedure executor");
scheduler.stop();
waitingTimeout.signalAll();
waitingTimeout.add(DelayedContainer.POISON);
}
public void join() {
@ -628,7 +670,7 @@ public class ProcedureExecutor<TEnvironment> {
*/
public void addChore(final ProcedureInMemoryChore chore) {
chore.setState(ProcedureState.RUNNABLE);
waitingTimeout.add(chore);
waitingTimeout.add(new DelayedContainer(chore));
}
/**
@ -638,7 +680,7 @@ public class ProcedureExecutor<TEnvironment> {
*/
public boolean removeChore(final ProcedureInMemoryChore chore) {
chore.setState(ProcedureState.FINISHED);
return waitingTimeout.remove(chore);
return waitingTimeout.remove(new DelayedContainer(chore));
}
/**
@ -927,15 +969,16 @@ public class ProcedureExecutor<TEnvironment> {
private void timeoutLoop() {
while (isRunning()) {
Procedure proc = waitingTimeout.poll();
if (proc == null) continue;
if (proc.getTimeRemaining() > 100) {
// got an early wake, maybe a stop?
// re-enqueue the task in case was not a stop or just a signal
waitingTimeout.add(proc);
Procedure proc;
try {
proc = waitingTimeout.take().proc;
} catch (InterruptedException e) {
// Just consume the interruption.
continue;
}
if (proc == null) { // POISON to stop
break;
}
// ----------------------------------------------------------------------------
// TODO-MAYBE: Should we provide a notification to the store with the
@ -955,8 +998,8 @@ public class ProcedureExecutor<TEnvironment> {
} catch (Throwable e) {
LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
}
proc.setStartTime(EnvironmentEdgeManager.currentTime());
if (proc.isRunnable()) waitingTimeout.add(proc);
proc.updateTimestamp();
if (proc.isRunnable()) waitingTimeout.add(new DelayedContainer(proc));
}
continue;
}
@ -970,8 +1013,6 @@ public class ProcedureExecutor<TEnvironment> {
store.update(proc);
scheduler.addFront(proc);
continue;
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
waitingTimeout.add(proc);
}
}
}
@ -1171,7 +1212,7 @@ public class ProcedureExecutor<TEnvironment> {
procedure.setState(ProcedureState.WAITING);
break;
case WAITING_TIMEOUT:
waitingTimeout.add(procedure);
waitingTimeout.add(new DelayedContainer(procedure));
break;
default:
break;
@ -1179,7 +1220,7 @@ public class ProcedureExecutor<TEnvironment> {
}
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
waitingTimeout.add(procedure);
waitingTimeout.add(new DelayedContainer(procedure));
} else if (!isSuspended) {
// No subtask, so we are done
procedure.setState(ProcedureState.FINISHED);

View File

@ -1,234 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.util;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class TimeoutBlockingQueue<E> {
public static interface TimeoutRetriever<T> {
long getTimeout(T object);
TimeUnit getTimeUnit(T object);
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final TimeoutRetriever<? super E> timeoutRetriever;
private E[] objects;
private int head = 0;
private int tail = 0;
public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
this(32, timeoutRetriever);
}
@SuppressWarnings("unchecked")
public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) {
this.objects = (E[])new Object[capacity];
this.timeoutRetriever = timeoutRetriever;
}
public void dump() {
for (int i = 0; i < objects.length; ++i) {
if (i == head) {
System.out.print("[" + objects[i] + "] ");
} else if (i == tail) {
System.out.print("]" + objects[i] + "[ ");
} else {
System.out.print(objects[i] + " ");
}
}
System.out.println();
}
public void clear() {
lock.lock();
try {
if (head != tail) {
for (int i = head; i < tail; ++i) {
objects[i] = null;
}
head = 0;
tail = 0;
waitCond.signal();
}
} finally {
lock.unlock();
}
}
public void add(E e) {
if (e == null) throw new NullPointerException();
lock.lock();
try {
addElement(e);
waitCond.signal();
} finally {
lock.unlock();
}
}
public boolean remove(E e) {
if (e == null) return false;
lock.lock();
try {
for (int i = 0; i < objects.length; ++i) {
if (e.equals(objects[i])) {
objects[i] = null;
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public E poll() {
lock.lock();
try {
if (isEmpty()) {
waitCond.await();
return null;
}
E elem = objects[head];
long nanos = getNanosTimeout(elem);
nanos = waitCond.awaitNanos(nanos);
return nanos > 0 ? null : removeFirst();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock();
}
}
public int size() {
return tail - head;
}
public boolean isEmpty() {
return (tail - head) == 0;
}
public void signalAll() {
lock.lock();
try {
waitCond.signalAll();
} finally {
lock.unlock();
}
}
private void addElement(E elem) {
int size = (tail - head);
if ((objects.length - size) == 0) {
int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
E[] newObjects = (E[])new Object[capacity];
if (compareTimeouts(objects[tail - 1], elem) <= 0) {
// Append
System.arraycopy(objects, head, newObjects, 0, tail);
tail -= head;
newObjects[tail++] = elem;
} else if (compareTimeouts(objects[head], elem) > 0) {
// Prepend
System.arraycopy(objects, head, newObjects, 1, tail);
newObjects[0] = elem;
tail -= (head - 1);
} else {
// Insert in the middle
int index = upperBound(head, tail - 1, elem);
int newIndex = (index - head);
System.arraycopy(objects, head, newObjects, 0, newIndex);
newObjects[newIndex] = elem;
System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
tail -= (head - 1);
}
head = 0;
objects = newObjects;
} else {
if (tail == objects.length) {
// shift down |-----AAAAAAA|
tail -= head;
System.arraycopy(objects, head, objects, 0, tail);
head = 0;
}
if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
// Append
objects[tail++] = elem;
} else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
// Prepend
objects[--head] = elem;
} else {
// Insert in the middle
int index = upperBound(head, tail - 1, elem);
System.arraycopy(objects, index, objects, index + 1, tail - index);
objects[index] = elem;
tail++;
}
}
}
private E removeFirst() {
E elem = objects[head];
objects[head] = null;
head = (head + 1) % objects.length;
if (head == 0) tail = 0;
return elem;
}
private int upperBound(int start, int end, E key) {
while (start < end) {
int mid = (start + end) >>> 1;
E mitem = objects[mid];
int cmp = compareTimeouts(mitem, key);
if (cmp > 0) {
end = mid;
} else {
start = mid + 1;
}
}
return start;
}
private int compareTimeouts(final E a, final E b) {
long t1 = getNanosTimeout(a);
long t2 = getNanosTimeout(b);
return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
}
private long getNanosTimeout(final E obj) {
if (obj == null) return 0;
TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
long timeout = timeoutRetriever.getTimeout(obj);
return unit.toNanos(timeout);
}
}

View File

@ -1,159 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
@Category({MasterTests.class, MediumTests.class})
public class TestTimeoutBlockingQueue {
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
withLookingForStuckThread(true).build();
static class TestObject {
private long timeout;
private int seqId;
public TestObject(int seqId, long timeout) {
this.timeout = timeout;
this.seqId = seqId;
}
public long getTimeout() {
return timeout;
}
public String toString() {
return String.format("(%03d, %03d)", seqId, timeout);
}
}
static class TestObjectTimeoutRetriever implements TimeoutRetriever<TestObject> {
@Override
public long getTimeout(TestObject obj) {
return obj.getTimeout();
}
@Override
public TimeUnit getTimeUnit(TestObject obj) {
return TimeUnit.MILLISECONDS;
}
}
@Test
public void testOrder() {
TimeoutBlockingQueue<TestObject> queue =
new TimeoutBlockingQueue<TestObject>(8, new TestObjectTimeoutRetriever());
long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500};
for (int i = 0; i < timeouts.length; ++i) {
for (int j = 0; j <= i; ++j) {
queue.add(new TestObject(j, timeouts[j]));
queue.dump();
}
long prev = 0;
for (int j = 0; j <= i; ++j) {
TestObject obj = queue.poll();
assertTrue(obj.getTimeout() >= prev);
prev = obj.getTimeout();
queue.dump();
}
}
}
@Test
public void testTimeoutBlockingQueue() {
TimeoutBlockingQueue<TestObject> queue;
int[][] testArray = new int[][] {
{200, 400, 600}, // append
{200, 400, 100}, // prepend
{200, 400, 300}, // insert
};
for (int i = 0; i < testArray.length; ++i) {
int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length);
Arrays.sort(sortedArray);
// test with head == 0
queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
for (int j = 0; j < testArray[i].length; ++j) {
queue.add(new TestObject(j, testArray[i][j]));
queue.dump();
}
for (int j = 0; !queue.isEmpty(); ++j) {
assertEquals(sortedArray[j], queue.poll().getTimeout());
}
queue = new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
queue.add(new TestObject(0, 50));
assertEquals(50, queue.poll().getTimeout());
// test with head > 0
for (int j = 0; j < testArray[i].length; ++j) {
queue.add(new TestObject(j, testArray[i][j]));
queue.dump();
}
for (int j = 0; !queue.isEmpty(); ++j) {
assertEquals(sortedArray[j], queue.poll().getTimeout());
}
}
}
@Test
public void testRemove() {
TimeoutBlockingQueue<TestObject> queue =
new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
final int effectiveLen = 5;
TestObject[] objs = new TestObject[6];
for (int i = 0; i < effectiveLen; ++i) {
objs[i] = new TestObject(0, i * 10);
queue.add(objs[i]);
}
objs[effectiveLen] = new TestObject(0, effectiveLen * 10);
queue.dump();
for (int i = 0; i < effectiveLen; i += 2) {
assertTrue(queue.remove(objs[i]));
}
assertTrue(!queue.remove(objs[effectiveLen]));
for (int i = 0; i < effectiveLen; ++i) {
TestObject x = queue.poll();
assertEquals((i % 2) == 0 ? null : objs[i], x);
}
}
}