HBASE-17148 Procedure v2 - add bulk proc submit (Matteo Bertozzi)

This commit is contained in:
Michael Stack 2016-12-15 16:11:53 -08:00
parent 6acbee179f
commit e16e2a61c1
8 changed files with 140 additions and 13 deletions

View File

@ -58,8 +58,8 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
protected static final long NO_PROC_ID = -1;
protected static final int NO_TIMEOUT = -1;
public static final long NO_PROC_ID = -1;
public static final int NO_TIMEOUT = -1;
// unchanged after initialization
private NonceKey nonceKey = null;

View File

@ -675,25 +675,19 @@ public class ProcedureExecutor<TEnvironment> {
*/
public long submitProcedure(final Procedure proc, final long nonceGroup, final long nonce) {
Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(isRunning(), "executor not running");
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) {
Preconditions.checkArgument(proc.hasOwner(), "missing owner");
}
// Initialize the Procedure ID
final long currentProcId = nextProcId();
proc.setProcId(currentProcId);
// Prepare procedure
prepareProcedure(proc);
// Check whether the proc exists. If exist, just return the proc id.
// This is to prevent the same proc to submit multiple times (it could happen
// when client could not talk to server and resubmit the same request).
if (nonce != HConstants.NO_NONCE) {
NonceKey noncekey = new NonceKey(nonceGroup, nonce);
final NonceKey noncekey = new NonceKey(nonceGroup, nonce);
proc.setNonceKey(noncekey);
Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, currentProcId);
Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, proc.getProcId());
if (oldProcId != null) {
// Found the proc
return oldProcId.longValue();
@ -706,6 +700,51 @@ public class ProcedureExecutor<TEnvironment> {
LOG.debug("Procedure " + proc + " added to the store.");
}
// Add the procedure to the executor
return pushProcedure(proc);
}
/**
* Add a set of new root-procedure to the executor.
* @param procs the new procedures to execute.
*/
public void submitProcedures(final Procedure[] procs) {
Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(isRunning(), "executor not running");
// Prepare procedure
for (int i = 0; i < procs.length; ++i) {
prepareProcedure(procs[i]);
}
// Commit the transaction
store.insert(procs);
if (LOG.isDebugEnabled()) {
LOG.debug("Procedures added to the store: " + Arrays.toString(procs));
}
// Add the procedure to the executor
for (int i = 0; i < procs.length; ++i) {
pushProcedure(procs[i]);
}
}
private void prepareProcedure(final Procedure proc) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(isRunning(), "executor not running");
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) {
Preconditions.checkArgument(proc.hasOwner(), "missing owner");
}
// Initialize the Procedure ID
final long currentProcId = nextProcId();
proc.setProcId(currentProcId);
}
private long pushProcedure(final Procedure proc) {
final long currentProcId = proc.getProcId();
// Create the rollback stack for the procedure
RootProcedureState stack = new RootProcedureState();
rollbackStack.put(currentProcId, stack);

View File

@ -61,6 +61,11 @@ public class NoopProcedureStore extends ProcedureStoreBase {
// no-op
}
@Override
public void insert(Procedure[] proc) {
// no-op
}
@Override
public void update(Procedure proc) {
// no-op

View File

@ -174,6 +174,15 @@ public interface ProcedureStore {
*/
void insert(Procedure proc, Procedure[] subprocs);
/**
* Serialize a set of new procedures.
* These procedures are freshly submitted to the executor and each procedure
* has a 'RUNNABLE' state and the initial information required to start up.
*
* @param procs the procedures to serialize and write to the store.
*/
void insert(Procedure[] procs);
/**
* The specified procedure was executed,
* and the new state should be written to the store.

View File

@ -477,6 +477,12 @@ public class ProcedureStoreTracker {
trackProcIds(procId);
}
public void insert(final long[] procIds) {
for (int i = 0; i < procIds.length; ++i) {
insert(procIds[i]);
}
}
public void insert(final long procId, final long[] subProcIds) {
update(procId);
for (int i = 0; i < subProcIds.length; ++i) {

View File

@ -417,6 +417,34 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
@Override
public void insert(final Procedure[] procs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Insert " + Arrays.toString(procs));
}
ByteSlot slot = acquireSlot();
try {
// Serialize the insert
long[] procIds = new long[procs.length];
for (int i = 0; i < procs.length; ++i) {
assert !procs[i].hasParent();
procIds[i] = procs[i].getProcId();
ProcedureWALFormat.writeInsert(slot, procs[i]);
}
// Push the transaction data and wait until it is persisted
pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
LOG.fatal("Unable to serialize one of the procedure: " + Arrays.toString(procs), e);
throw new RuntimeException(e);
} finally {
releaseSlot(slot);
}
}
@Override
public void update(final Procedure proc) {
if (LOG.isTraceEnabled()) {
@ -513,7 +541,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
// Push the transaction data and wait until it is persisted
pushData(PushType.DELETE, slot, -1, procIds);
pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
@ -602,6 +630,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
case INSERT:
if (subProcIds == null) {
storeTracker.insert(procId);
} else if (procId == Procedure.NO_PROC_ID) {
storeTracker.insert(subProcIds);
} else {
storeTracker.insert(procId, subProcIds);
}

View File

@ -128,6 +128,25 @@ public class TestProcedureExecutor {
ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2);
}
@Test
public void testSubmitBatch() throws Exception {
Procedure[] procs = new Procedure[5];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new NoopProcedure<TestProcEnv>();
}
// submit procedures
createNewExecutor(htu.getConfiguration(), 3);
procExecutor.submitProcedures(procs);
// wait for procs to be completed
for (int i = 0; i < procs.length; ++i) {
final long procId = procs[i].getProcId();
ProcedureTestingUtility.waitProcedure(procExecutor, procId);
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
}
}
private int waitThreadCount(final int expectedThreads) {
while (procExecutor.isRunning()) {
if (procExecutor.getWorkerThreadCount() == expectedThreads) {

View File

@ -784,6 +784,25 @@ public class TestWALProcedureStore {
}
}
@Test
public void testBatchInsert() throws Exception {
final int count = 10;
final TestProcedure[] procs = new TestProcedure[count];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestProcedure(i + 1);
}
procStore.insert(procs);
restartAndAssert(count, count, 0, 0);
for (int i = 0; i < procs.length; ++i) {
final long procId = procs[i].getProcId();
procStore.delete(procId);
restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
}
procStore.removeInactiveLogsForTesting();
assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size());
}
private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
int completedCount, int corruptedCount) throws Exception {
return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,