HBASE-16533 Procedure v2 - Extract chore from the executor
This commit is contained in:
parent
7b95ac117d
commit
ea15522704
|
@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -127,7 +125,8 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
* the master (e.g. master failover) so, if we delay a bit the real deletion of
|
* the master (e.g. master failover) so, if we delay a bit the real deletion of
|
||||||
* the proc result the client will be able to get the result the next try.
|
* the proc result the client will be able to get the result the next try.
|
||||||
*/
|
*/
|
||||||
private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
|
private static class CompletedProcedureCleaner<TEnvironment>
|
||||||
|
extends ProcedureInMemoryChore<TEnvironment> {
|
||||||
private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
|
private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
|
||||||
|
|
||||||
private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
|
private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
|
||||||
|
@ -148,14 +147,15 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
final Map<Long, ProcedureInfo> completedMap,
|
final Map<Long, ProcedureInfo> completedMap,
|
||||||
final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
|
final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
|
||||||
// set the timeout interval that triggers the periodic-procedure
|
// set the timeout interval that triggers the periodic-procedure
|
||||||
setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
|
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
|
||||||
this.completed = completedMap;
|
this.completed = completedMap;
|
||||||
this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
|
this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void periodicExecute(final TEnvironment env) {
|
@Override
|
||||||
|
protected void periodicExecute(final TEnvironment env) {
|
||||||
if (completed.isEmpty()) {
|
if (completed.isEmpty()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("No completed procedures to cleanup.");
|
LOG.trace("No completed procedures to cleanup.");
|
||||||
|
@ -189,31 +189,6 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Procedure[] execute(final TEnvironment env) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rollback(final TEnvironment env) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean abort(final TEnvironment env) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void serializeStateData(final OutputStream stream) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deserializeStateData(final InputStream stream) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -536,9 +511,8 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
threads[i].start();
|
threads[i].start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add completed cleaner
|
// Add completed cleaner chore
|
||||||
waitingTimeout.add(
|
addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
|
||||||
new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
@ -626,6 +600,22 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return procedureLists;
|
return procedureLists;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a chore procedure to the executor
|
||||||
|
* @param chore the chore to add
|
||||||
|
*/
|
||||||
|
public void addChore(final ProcedureInMemoryChore chore) {
|
||||||
|
waitingTimeout.add(chore);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a chore procedure from the executor
|
||||||
|
* @param chore the chore to remove
|
||||||
|
*/
|
||||||
|
public void removeChore(final ProcedureInMemoryChore chore) {
|
||||||
|
waitingTimeout.remove(chore);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a new root-procedure to the executor.
|
* Add a new root-procedure to the executor.
|
||||||
* @param proc the new procedure to execute.
|
* @param proc the new procedure to execute.
|
||||||
|
@ -905,12 +895,12 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// will have the tracker saying everything is in the last log.
|
// will have the tracker saying everything is in the last log.
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
// The CompletedProcedureCleaner is a special case, and it acts as a chore.
|
// The ProcedureInMemoryChore is a special case, and it acts as a chore.
|
||||||
// instead of bringing the Chore class in, we reuse this timeout thread for
|
// instead of bringing the Chore class in, we reuse this timeout thread for
|
||||||
// this special case.
|
// this special case.
|
||||||
if (proc instanceof CompletedProcedureCleaner) {
|
if (proc instanceof ProcedureInMemoryChore) {
|
||||||
try {
|
try {
|
||||||
((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
|
((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
|
LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special procedure used as a chore.
|
||||||
|
* instead of bringing the Chore class in (dependencies reason),
|
||||||
|
* we reuse the executor timeout thread for this special case.
|
||||||
|
*
|
||||||
|
* The assumption is that procedure is used as hook to dispatch other procedures
|
||||||
|
* or trigger some cleanups. It does not store state in the ProcedureStore.
|
||||||
|
* this is just for in-memory chore executions.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEnvironment> {
|
||||||
|
protected ProcedureInMemoryChore(final int timeoutMsec) {
|
||||||
|
setTimeout(timeoutMsec);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void periodicExecute(final TEnvironment env);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure[] execute(final TEnvironment env) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(final TEnvironment env) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(final TEnvironment env) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serializeStateData(final OutputStream stream) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deserializeStateData(final InputStream stream) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
|
@ -92,6 +92,20 @@ public class TimeoutBlockingQueue<E> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void remove(E e) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < objects.length; ++i) {
|
||||||
|
if (objects[i] == e) {
|
||||||
|
objects[i] = null;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
|
||||||
public E poll() {
|
public E poll() {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
@ -210,6 +224,7 @@ public class TimeoutBlockingQueue<E> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getNanosTimeout(final E obj) {
|
private long getNanosTimeout(final E obj) {
|
||||||
|
if (obj == null) return 0;
|
||||||
TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
|
TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
|
||||||
long timeout = timeoutRetriever.getTimeout(obj);
|
long timeout = timeoutRetriever.getTimeout(obj);
|
||||||
return unit.toNanos(timeout);
|
return unit.toNanos(timeout);
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
|
public class TestProcedureInMemoryChore {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestProcedureInMemoryChore.class);
|
||||||
|
|
||||||
|
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
||||||
|
|
||||||
|
private TestProcEnv procEnv;
|
||||||
|
private NoopProcedureStore procStore;
|
||||||
|
private ProcedureExecutor<TestProcEnv> procExecutor;
|
||||||
|
|
||||||
|
private HBaseCommonTestingUtility htu;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
htu = new HBaseCommonTestingUtility();
|
||||||
|
|
||||||
|
procEnv = new TestProcEnv();
|
||||||
|
procStore = new NoopProcedureStore();
|
||||||
|
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||||
|
procExecutor.testing = new ProcedureExecutor.Testing();
|
||||||
|
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||||
|
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
procExecutor.stop();
|
||||||
|
procStore.stop(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChoreAddAndRemove() throws Exception {
|
||||||
|
final int timeoutMSec = 50;
|
||||||
|
final int nCountDown = 5;
|
||||||
|
|
||||||
|
// submit the chore and wait for execution
|
||||||
|
CountDownLatch latch = new CountDownLatch(nCountDown);
|
||||||
|
TestLatchChore chore = new TestLatchChore(timeoutMSec, latch);
|
||||||
|
procExecutor.addChore(chore);
|
||||||
|
latch.await();
|
||||||
|
|
||||||
|
// remove the chore and verify it is no longer executed
|
||||||
|
procExecutor.removeChore(chore);
|
||||||
|
latch = new CountDownLatch(nCountDown);
|
||||||
|
chore.setLatch(latch);
|
||||||
|
latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS);
|
||||||
|
LOG.info("chore latch count=" + latch.getCount());
|
||||||
|
assertTrue(latch.getCount() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestLatchChore extends ProcedureInMemoryChore<TestProcEnv> {
|
||||||
|
private CountDownLatch latch;
|
||||||
|
|
||||||
|
public TestLatchChore(final int timeoutMSec, final CountDownLatch latch) {
|
||||||
|
super(timeoutMSec);
|
||||||
|
setLatch(latch);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLatch(final CountDownLatch latch) {
|
||||||
|
this.latch = latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void periodicExecute(final TestProcEnv env) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestProcEnv {
|
||||||
|
}
|
||||||
|
}
|
|
@ -131,4 +131,26 @@ public class TestTimeoutBlockingQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemove() {
|
||||||
|
TimeoutBlockingQueue<TestObject> queue =
|
||||||
|
new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever());
|
||||||
|
|
||||||
|
TestObject[] objs = new TestObject[5];
|
||||||
|
for (int i = 0; i < objs.length; ++i) {
|
||||||
|
objs[i] = new TestObject(0, i * 10);
|
||||||
|
queue.add(objs[i]);
|
||||||
|
}
|
||||||
|
queue.dump();
|
||||||
|
|
||||||
|
for (int i = 0; i < objs.length; i += 2) {
|
||||||
|
queue.remove(objs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < objs.length; ++i) {
|
||||||
|
TestObject x = queue.poll();
|
||||||
|
assertEquals((i % 2) == 0 ? null : objs[i], x);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue