HBASE-17090 Procedure v2 - fast wake if nothing else is running (Matteo Bertozzi)

This commit is contained in:
Michael Stack 2016-12-27 16:19:32 -08:00
parent 306ef83c9c
commit da97569eae
5 changed files with 49 additions and 27 deletions

View File

@ -1536,7 +1536,7 @@ public class ProcedureExecutor<TEnvironment> {
final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (procedure == null) continue; if (procedure == null) continue;
activeExecutorCount.incrementAndGet(); store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
executionStartTime.set(EnvironmentEdgeManager.currentTime()); executionStartTime.set(EnvironmentEdgeManager.currentTime());
try { try {
if (isTraceEnabled) { if (isTraceEnabled) {
@ -1544,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> {
} }
executeProcedure(procedure); executeProcedure(procedure);
} finally { } finally {
activeExecutorCount.decrementAndGet(); store.setRunningProcedureCount(activeExecutorCount.decrementAndGet());
lastUpdate = EnvironmentEdgeManager.currentTime(); lastUpdate = EnvironmentEdgeManager.currentTime();
executionStartTime.set(Long.MAX_VALUE); executionStartTime.set(Long.MAX_VALUE);
} }

View File

@ -51,6 +51,11 @@ public class NoopProcedureStore extends ProcedureStoreBase {
return numThreads; return numThreads;
} }
@Override
public void setRunningProcedureCount(final int count) {
// no-op
}
@Override @Override
public void load(final ProcedureLoader loader) throws IOException { public void load(final ProcedureLoader loader) throws IOException {
loader.setMaxProcId(0); loader.setMaxProcId(0);

View File

@ -150,6 +150,12 @@ public interface ProcedureStore {
*/ */
int getNumThreads(); int getNumThreads();
/**
* Set the number of procedure running.
* This can be used, for example, by the store to know how long to wait before a sync.
*/
void setRunningProcedureCount(int count);
/** /**
* Acquire the lease for the procedure store. * Acquire the lease for the procedure store.
*/ */

View File

@ -136,7 +136,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
private LinkedTransferQueue<ByteSlot> slotsCache = null; private LinkedTransferQueue<ByteSlot> slotsCache = null;
private Set<ProcedureWALFile> corruptedLogs = null; private Set<ProcedureWALFile> corruptedLogs = null;
private FSDataOutputStream stream = null; private FSDataOutputStream stream = null;
private int runningProcCount = 1;
private long flushLogId = 0; private long flushLogId = 0;
private int syncMaxSlot = 1;
private int slotIndex = 0; private int slotIndex = 0;
private Thread syncThread; private Thread syncThread;
private ByteSlot[] slots; private ByteSlot[] slots;
@ -198,6 +200,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Init buffer slots // Init buffer slots
loading.set(true); loading.set(true);
runningProcCount = numSlots;
syncMaxSlot = numSlots;
slots = new ByteSlot[numSlots]; slots = new ByteSlot[numSlots];
slotsCache = new LinkedTransferQueue(); slotsCache = new LinkedTransferQueue();
while (slotsCache.size() < numSlots) { while (slotsCache.size() < numSlots) {
@ -288,6 +292,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
return slots == null ? 0 : slots.length; return slots == null ? 0 : slots.length;
} }
@Override
public void setRunningProcedureCount(final int count) {
LOG.debug("set running procedure count=" + count + " slots=" + slots.length);
this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
}
public ProcedureStoreTracker getStoreTracker() { public ProcedureStoreTracker getStoreTracker() {
return storeTracker; return storeTracker;
} }
@ -623,7 +633,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
throw new RuntimeException("sync aborted", syncException.get()); throw new RuntimeException("sync aborted", syncException.get());
} else if (inSync.get()) { } else if (inSync.get()) {
syncCond.await(); syncCond.await();
} else if (slotIndex == slots.length) { } else if (slotIndex >= syncMaxSlot) {
slotCond.signal(); slotCond.signal();
syncCond.await(); syncCond.await();
} else { } else {
@ -642,7 +652,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
// Notify that the slots are full // Notify that the slots are full
if (slotIndex == slots.length) { if (slotIndex == syncMaxSlot) {
waitCond.signal(); waitCond.signal();
slotCond.signal(); slotCond.signal();
} }
@ -725,8 +735,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
} }
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
syncMaxSlot = runningProcCount;
assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot;
final long syncWaitSt = System.currentTimeMillis(); final long syncWaitSt = System.currentTimeMillis();
if (slotIndex != slots.length) { if (slotIndex != syncMaxSlot) {
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
} }
@ -734,7 +746,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
final long syncWaitMs = currentTs - syncWaitSt; final long syncWaitMs = currentTs - syncWaitSt;
final float rollSec = getMillisFromLastRoll() / 1000.0f; final float rollSec = getMillisFromLastRoll() / 1000.0f;
final float syncedPerSec = totalSyncedToStore / rollSec; final float syncedPerSec = totalSyncedToStore / rollSec;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
StringUtils.humanTimeDiff(syncWaitMs), slotIndex, StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
StringUtils.humanSize(totalSyncedToStore), StringUtils.humanSize(totalSyncedToStore),
@ -813,27 +825,31 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced; return totalSynced;
} }
protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots,
throws IOException { final int offset, final int count) throws IOException {
long totalSynced = 0; long totalSynced = 0;
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
ByteSlot data = slots[offset + i]; final ByteSlot data = slots[offset + i];
data.writeTo(stream); data.writeTo(stream);
totalSynced += data.size(); totalSynced += data.size();
} }
syncStream(stream);
sendPostSyncSignal();
if (LOG.isTraceEnabled()) {
LOG.trace("Sync slots=" + count + '/' + syncMaxSlot +
", flushed=" + StringUtils.humanSize(totalSynced));
}
return totalSynced;
}
protected void syncStream(final FSDataOutputStream stream) throws IOException {
if (useHsync) { if (useHsync) {
stream.hsync(); stream.hsync();
} else { } else {
stream.hflush(); stream.hflush();
} }
sendPostSyncSignal();
if (LOG.isTraceEnabled()) {
LOG.trace("Sync slots=" + count + '/' + slots.length +
", flushed=" + StringUtils.humanSize(totalSynced));
}
return totalSynced;
} }
private boolean rollWriterWithRetries() { private boolean rollWriterWithRetries() {

View File

@ -149,7 +149,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
// Start worker threads. // Start worker threads.
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
futures[i] = executor.submit(this.new Worker(start)); futures[i] = executor.submit(new Worker(start));
} }
boolean failure = false; boolean failure = false;
try { try {
@ -197,8 +197,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
* If procedure store fails to roll log file (throws IOException), all threads quit, and at * If procedure store fails to roll log file (throws IOException), all threads quit, and at
* least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
*/ */
class Worker implements Callable<Integer> { private final class Worker implements Callable<Integer> {
final long start; private final long start;
public Worker(long start) { public Worker(long start) {
this.start = start; this.start = start;
@ -243,7 +243,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
} }
} }
public class NoSyncWalProcedureStore extends WALProcedureStore { private class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
final Path logDir) { final Path logDir) {
super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
@ -255,13 +255,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
} }
@Override @Override
protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) protected void syncStream(FSDataOutputStream stream) {
throws IOException { // no-op
long totalSynced = 0;
for (int i = 0; i < count; ++i) {
totalSynced += slots[offset + i].size();
}
return totalSynced;
} }
} }