diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 68d16a0011b..cb4ee477268 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -58,8 +58,8 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Procedure implements Comparable { - protected static final long NO_PROC_ID = -1; - protected static final int NO_TIMEOUT = -1; + public static final long NO_PROC_ID = -1; + public static final int NO_TIMEOUT = -1; // unchanged after initialization private NonceKey nonceKey = null; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 2e2dbdf1c12..fe5982ca7a2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -675,25 +675,19 @@ public class ProcedureExecutor { */ public long submitProcedure(final Procedure proc, final long nonceGroup, final long nonce) { Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(isRunning(), "executor not running"); - Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); - if (this.checkOwnerSet) { - Preconditions.checkArgument(proc.hasOwner(), "missing owner"); - } - // Initialize the Procedure ID - final long currentProcId = nextProcId(); - proc.setProcId(currentProcId); + // Prepare procedure + prepareProcedure(proc); // Check whether the proc exists. If exist, just return the proc id. // This is to prevent the same proc to submit multiple times (it could happen // when client could not talk to server and resubmit the same request). if (nonce != HConstants.NO_NONCE) { - NonceKey noncekey = new NonceKey(nonceGroup, nonce); + final NonceKey noncekey = new NonceKey(nonceGroup, nonce); proc.setNonceKey(noncekey); - Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, currentProcId); + Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, proc.getProcId()); if (oldProcId != null) { // Found the proc return oldProcId.longValue(); @@ -706,6 +700,51 @@ public class ProcedureExecutor { LOG.debug("Procedure " + proc + " added to the store."); } + // Add the procedure to the executor + return pushProcedure(proc); + } + + /** + * Add a set of new root-procedure to the executor. + * @param procs the new procedures to execute. + */ + public void submitProcedures(final Procedure[] procs) { + Preconditions.checkArgument(lastProcId.get() >= 0); + Preconditions.checkArgument(isRunning(), "executor not running"); + + // Prepare procedure + for (int i = 0; i < procs.length; ++i) { + prepareProcedure(procs[i]); + } + + // Commit the transaction + store.insert(procs); + if (LOG.isDebugEnabled()) { + LOG.debug("Procedures added to the store: " + Arrays.toString(procs)); + } + + // Add the procedure to the executor + for (int i = 0; i < procs.length; ++i) { + pushProcedure(procs[i]); + } + } + + private void prepareProcedure(final Procedure proc) { + Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); + Preconditions.checkArgument(isRunning(), "executor not running"); + Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); + if (this.checkOwnerSet) { + Preconditions.checkArgument(proc.hasOwner(), "missing owner"); + } + + // Initialize the Procedure ID + final long currentProcId = nextProcId(); + proc.setProcId(currentProcId); + } + + private long pushProcedure(final Procedure proc) { + final long currentProcId = proc.getProcId(); + // Create the rollback stack for the procedure RootProcedureState stack = new RootProcedureState(); rollbackStack.put(currentProcId, stack); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 82ef8f0b702..f248dc37c87 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -61,6 +61,11 @@ public class NoopProcedureStore extends ProcedureStoreBase { // no-op } + @Override + public void insert(Procedure[] proc) { + // no-op + } + @Override public void update(Procedure proc) { // no-op diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 7df52263101..e47ed63d6a2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -174,6 +174,15 @@ public interface ProcedureStore { */ void insert(Procedure proc, Procedure[] subprocs); + /** + * Serialize a set of new procedures. + * These procedures are freshly submitted to the executor and each procedure + * has a 'RUNNABLE' state and the initial information required to start up. + * + * @param procs the procedures to serialize and write to the store. + */ + void insert(Procedure[] procs); + /** * The specified procedure was executed, * and the new state should be written to the store. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 4fea0d46406..7ba72f2554a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -477,6 +477,12 @@ public class ProcedureStoreTracker { trackProcIds(procId); } + public void insert(final long[] procIds) { + for (int i = 0; i < procIds.length; ++i) { + insert(procIds[i]); + } + } + public void insert(final long procId, final long[] subProcIds) { update(procId); for (int i = 0; i < subProcIds.length; ++i) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 3a46f8ff975..3884e398bad 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -417,6 +417,34 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + @Override + public void insert(final Procedure[] procs) { + if (LOG.isTraceEnabled()) { + LOG.trace("Insert " + Arrays.toString(procs)); + } + + ByteSlot slot = acquireSlot(); + try { + // Serialize the insert + long[] procIds = new long[procs.length]; + for (int i = 0; i < procs.length; ++i) { + assert !procs[i].hasParent(); + procIds[i] = procs[i].getProcId(); + ProcedureWALFormat.writeInsert(slot, procs[i]); + } + + // Push the transaction data and wait until it is persisted + pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize one of the procedure: " + Arrays.toString(procs), e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + } + @Override public void update(final Procedure proc) { if (LOG.isTraceEnabled()) { @@ -513,7 +541,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Push the transaction data and wait until it is persisted - pushData(PushType.DELETE, slot, -1, procIds); + pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -602,6 +630,8 @@ public class WALProcedureStore extends ProcedureStoreBase { case INSERT: if (subProcIds == null) { storeTracker.insert(procId); + } else if (procId == Procedure.NO_PROC_ID) { + storeTracker.insert(subProcIds); } else { storeTracker.insert(procId, subProcIds); } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java index 851dc3ea5f4..289987be8b4 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java @@ -128,6 +128,25 @@ public class TestProcedureExecutor { ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2); } + @Test + public void testSubmitBatch() throws Exception { + Procedure[] procs = new Procedure[5]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new NoopProcedure(); + } + + // submit procedures + createNewExecutor(htu.getConfiguration(), 3); + procExecutor.submitProcedures(procs); + + // wait for procs to be completed + for (int i = 0; i < procs.length; ++i) { + final long procId = procs[i].getProcId(); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); + } + } + private int waitThreadCount(final int expectedThreads) { while (procExecutor.isRunning()) { if (procExecutor.getWorkerThreadCount() == expectedThreads) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 7ecffa13bb0..83f481ce839 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -784,6 +784,25 @@ public class TestWALProcedureStore { } } + @Test + public void testBatchInsert() throws Exception { + final int count = 10; + final TestProcedure[] procs = new TestProcedure[count]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestProcedure(i + 1); + } + procStore.insert(procs); + restartAndAssert(count, count, 0, 0); + + for (int i = 0; i < procs.length; ++i) { + final long procId = procs[i].getProcId(); + procStore.delete(procId); + restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); + } + procStore.removeInactiveLogsForTesting(); + assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size()); + } + private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount, int corruptedCount) throws Exception { return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,