HBASE-16802 Procedure v2 - group procedure cleaning

This commit is contained in:
Matteo Bertozzi 2016-10-11 16:17:09 -07:00
parent eb52e26822
commit 662a1b241f
6 changed files with 132 additions and 8 deletions

View File

@ -138,6 +138,9 @@ public class ProcedureExecutor<TEnvironment> {
private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
private static final int DEFAULT_BATCH_SIZE = 32;
private final Map<Long, ProcedureInfo> completed;
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
@ -165,6 +168,10 @@ public class ProcedureExecutor<TEnvironment> {
final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
final long[] batchIds = new long[batchSize];
int batchCount = 0;
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
@ -179,15 +186,22 @@ public class ProcedureExecutor<TEnvironment> {
if (isDebugEnabled) {
LOG.debug("Evict completed procedure: " + procInfo);
}
store.delete(entry.getKey());
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
store.delete(batchIds, 0, batchCount);
batchCount = 0;
}
it.remove();
NonceKey nonceKey = procInfo.getNonceKey();
final NonceKey nonceKey = procInfo.getNonceKey();
if (nonceKey != null) {
nonceKeysToProcIdsMap.remove(nonceKey);
}
}
}
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
}
}

View File

@ -75,4 +75,9 @@ public class NoopProcedureStore extends ProcedureStoreBase {
public void delete(Procedure proc, long[] subprocs) {
// no-op
}
@Override
public void delete(long[] procIds, int offset, int count) {
// no-op
}
}

View File

@ -196,4 +196,14 @@ public interface ProcedureStore {
* @param subProcIds the IDs of the sub-procedure to remove.
*/
void delete(Procedure parentProc, long[] subProcIds);
/**
* The specified procIds were removed from the executor,
* due to completion, abort or failure.
* The store implementor should remove all the information about the specified procIds.
* @param procIds the IDs of the procedures to remove.
* @param offset the array offset from where to start to delete
* @param count the number of IDs to delete
*/
void delete(long[] procIds, int offset, int count);
}

View File

@ -465,6 +465,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override
public void delete(final Procedure proc, final long[] subProcIds) {
assert proc != null : "expected a non-null procedure";
assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
if (LOG.isTraceEnabled()) {
LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
}
@ -486,6 +488,42 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
@Override
public void delete(final long[] procIds, final int offset, final int count) {
if (count == 0) return;
if (offset == 0 && count == procIds.length) {
delete(procIds);
} else if (count == 1) {
delete(procIds[offset]);
} else {
delete(Arrays.copyOfRange(procIds, offset, offset + count));
}
}
private void delete(final long[] procIds) {
if (LOG.isTraceEnabled()) {
LOG.trace("Delete " + Arrays.toString(procIds));
}
final ByteSlot slot = acquireSlot();
try {
// Serialize the delete
for (int i = 0; i < procIds.length; ++i) {
ProcedureWALFormat.writeDelete(slot, procIds[i]);
}
// Push the transaction data and wait until it is persisted
pushData(PushType.DELETE, slot, -1, procIds);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
LOG.fatal("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
throw new RuntimeException(e);
} finally {
releaseSlot(slot);
}
}
private ByteSlot acquireSlot() {
ByteSlot slot = slotsCache.poll();
return slot != null ? slot : new ByteSlot();

View File

@ -97,7 +97,7 @@ public class ProcedureTestingUtility {
procStore.load(loader);
}
public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
long runnableCount, int completedCount, int corruptedCount) throws Exception {
final LoadCounter loader = new LoadCounter();
storeRestart(procStore, loader);
@ -105,6 +105,7 @@ public class ProcedureTestingUtility {
assertEquals(runnableCount, loader.getRunnableCount());
assertEquals(completedCount, loader.getCompletedCount());
assertEquals(corruptedCount, loader.getCorruptedCount());
return loader;
}
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
@ -366,6 +367,15 @@ public class ProcedureTestingUtility {
return corrupted.size();
}
public boolean isRunnable(final long procId) {
for (Procedure proc: runnable) {
if (proc.getProcId() == procId) {
return true;
}
}
return false;
}
@Override
public void setMaxProcId(long maxProcId) {
this.maxProcId = maxProcId;

View File

@ -404,13 +404,13 @@ public class TestWALProcedureStore {
procStore.insert(procs[1], null);
procStore.insert(procs[2], null);
procStore.insert(procs[3], null);
procStore.delete(procs[0], null);
procStore.delete(procs[0].getProcId());
procStore.rollWriterForTesting();
procStore.delete(procs[2], null);
procStore.delete(procs[2].getProcId());
procStore.update(procs[3]);
procStore.insert(procs[4], null);
procStore.rollWriterForTesting();
procStore.delete(procs[4], null);
procStore.delete(procs[4].getProcId());
procStore.insert(procs[5], null);
// Stop the store
@ -737,9 +737,56 @@ public class TestWALProcedureStore {
restartAndAssert(3, 0, 1, 0);
}
private void restartAndAssert(long maxProcId, long runnableCount,
@Test
public void testBatchDelete() throws Exception {
for (int i = 1; i < 10; ++i) {
procStore.insert(new TestProcedure(i), null);
}
// delete nothing
long[] toDelete = new long[] { 1, 2, 3, 4 };
procStore.delete(toDelete, 2, 0);
LoadCounter loader = restartAndAssert(9, 9, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(true, loader.isRunnable(i));
}
// delete the full "toDelete" array (2, 4, 6, 8)
toDelete = new long[] { 2, 4, 6, 8 };
procStore.delete(toDelete, 0, toDelete.length);
loader = restartAndAssert(9, 5, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(i % 2 != 0, loader.isRunnable(i));
}
// delete a slice of "toDelete" (1, 3)
toDelete = new long[] { 5, 7, 1, 3, 9 };
procStore.delete(toDelete, 2, 2);
loader = restartAndAssert(9, 3, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
}
// delete a single item (5)
toDelete = new long[] { 5 };
procStore.delete(toDelete, 0, 1);
loader = restartAndAssert(9, 2, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
}
// delete remaining using a slice of "toDelete" (7, 9)
toDelete = new long[] { 0, 7, 9 };
procStore.delete(toDelete, 1, 2);
loader = restartAndAssert(0, 0, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(false, loader.isRunnable(i));
}
}
private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
int completedCount, int corruptedCount) throws Exception {
ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
runnableCount, completedCount, corruptedCount);
}