HBASE-15579 Procedure v2 - Remove synchronized around nonce in Procedure submit

This commit is contained in:
Matteo Bertozzi 2016-04-22 10:13:13 -07:00
parent 57e1dbc8a6
commit bfca2a4606
2 changed files with 41 additions and 26 deletions

View File

@ -635,34 +635,23 @@ public class ProcedureExecutor<TEnvironment> {
Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(!proc.hasParent());
Long currentProcId;
// Initialize the Procedure ID
long currentProcId = nextProcId();
proc.setProcId(currentProcId);
// The following part of the code has to be synchronized to prevent multiple request
// with the same nonce to execute at the same time.
synchronized (this) {
// Check whether the proc exists. If exist, just return the proc id.
// This is to prevent the same proc to submit multiple times (it could happen
// when client could not talk to server and resubmit the same request).
NonceKey noncekey = null;
if (nonce != HConstants.NO_NONCE) {
noncekey = new NonceKey(nonceGroup, nonce);
currentProcId = nonceKeysToProcIdsMap.get(noncekey);
if (currentProcId != null) {
// Found the proc
return currentProcId;
}
}
// Initialize the Procedure ID
currentProcId = nextProcId();
proc.setProcId(currentProcId);
// This is new procedure. Set the noncekey and insert into the map.
if (noncekey != null) {
NonceKey noncekey = new NonceKey(nonceGroup, nonce);
proc.setNonceKey(noncekey);
nonceKeysToProcIdsMap.put(noncekey, currentProcId);
Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, currentProcId);
if (oldProcId != null) {
// Found the proc
return oldProcId.longValue();
}
}
} // end of synchronized (this)
// Commit the transaction
store.insert(proc, null);

View File

@ -207,6 +207,32 @@ public class TestWALProcedureStore {
storeRestart(loader);
}
@Test
public void testProcIdHoles() throws Exception {
// Insert
for (int i = 0; i < 100; i += 2) {
procStore.insert(new TestProcedure(i), null);
if (i > 0 && (i % 10) == 0) {
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(0, loader.getCorruptedCount());
assertEquals((i / 2) + 1, loader.getLoadedCount());
}
}
assertEquals(10, procStore.getActiveLogs().size());
// Delete
for (int i = 0; i < 100; i += 2) {
procStore.delete(i);
}
assertEquals(1, procStore.getActiveLogs().size());
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(0, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
@Test
public void testCorruptedTrailer() throws Exception {
// Insert something