HBASE-13202 Procedure v2 - core framework (addendum)

This commit is contained in:
Matteo Bertozzi 2015-04-15 09:39:25 +01:00
parent 4f151444b5
commit d75326a797
4 changed files with 39 additions and 5 deletions

View File

@ -95,6 +95,7 @@ public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.
public void clear() {
lock.lock();
try {
currentQuantum = 0;
current = null;
objMap.clear();
} finally {

View File

@ -195,11 +195,12 @@ public class ProcedureStoreTracker {
// Grow/Merge Helpers
// ========================================================================
public boolean canGrow(final long procId) {
return (procId - start) < MAX_NODE_SIZE;
return Math.abs(procId - start) < MAX_NODE_SIZE;
}
public boolean canMerge(final BitSetNode rightNode) {
return (start + rightNode.getEnd()) < MAX_NODE_SIZE;
assert start < rightNode.getEnd();
return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
}
public void grow(final long procId) {
@ -258,6 +259,11 @@ public class ProcedureStoreTracker {
}
}
@Override
public String toString() {
return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
}
// ========================================================================
// Min/Max Helpers
// ========================================================================
@ -377,6 +383,7 @@ public class ProcedureStoreTracker {
@InterfaceAudience.Private
public void setDeleted(final long procId, final boolean isDeleted) {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId in the node";
node.updateState(procId, isDeleted);
}
@ -507,6 +514,7 @@ public class ProcedureStoreTracker {
}
private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
assert leftNode.getStart() < rightNode.getStart();
leftNode.merge(rightNode);
map.remove(rightNode.getStart());
return leftNode;

View File

@ -389,7 +389,7 @@ public class WALProcedureStore implements ProcedureStore {
}
private long pushData(final ByteSlot slot) {
assert !logs.isEmpty() : "recoverLease() must be called before inserting data";
assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data";
long logId = -1;
lock.lock();
@ -677,7 +677,7 @@ public class WALProcedureStore implements ProcedureStore {
try {
log.readTracker(storeTracker);
} catch (IOException e) {
LOG.error("Unable to read tracker for " + log, e);
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
// try the next one...
storeTracker.clear();
storeTracker.setPartialFlag(true);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2.store;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -165,4 +166,28 @@ public class TestProcedureStoreTracker {
tracker.delete(procs[5].getProcId());
assertTrue(tracker.isEmpty());
}
@Test
public void testRandLoad() {
final int NPROCEDURES = 2500;
final int NRUNS = 5000;
final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
Random rand = new Random(1);
for (int i = 0; i < NRUNS; ++i) {
assertTrue(tracker.isEmpty());
int count = 0;
while (count < NPROCEDURES) {
long procId = rand.nextLong();
if (procId < 1) continue;
tracker.setDeleted(procId, i % 2 == 0);
count++;
}
tracker.clear();
}
}
}