HBASE-13529 Procedure v2 - WAL Improvements
This commit is contained in:
parent
2b392de640
commit
0b86c75220
|
@ -52,6 +52,7 @@ public class ProcedureStoreTracker {
|
||||||
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
|
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
|
||||||
private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
|
private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
|
||||||
|
|
||||||
|
private final boolean partial;
|
||||||
private long[] updated;
|
private long[] updated;
|
||||||
private long[] deleted;
|
private long[] deleted;
|
||||||
private long start;
|
private long start;
|
||||||
|
@ -88,6 +89,7 @@ public class ProcedureStoreTracker {
|
||||||
deleted[i] = partial ? 0 : WORD_MASK;
|
deleted[i] = partial ? 0 : WORD_MASK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.partial = partial;
|
||||||
updateState(procId, false);
|
updateState(procId, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,6 +97,7 @@ public class ProcedureStoreTracker {
|
||||||
this.start = start;
|
this.start = start;
|
||||||
this.updated = updated;
|
this.updated = updated;
|
||||||
this.deleted = deleted;
|
this.deleted = deleted;
|
||||||
|
this.partial = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void update(final long procId) {
|
public void update(final long procId) {
|
||||||
|
@ -223,17 +226,18 @@ public class ProcedureStoreTracker {
|
||||||
int oldSize = updated.length;
|
int oldSize = updated.length;
|
||||||
|
|
||||||
newBitmap = new long[oldSize + delta];
|
newBitmap = new long[oldSize + delta];
|
||||||
|
for (int i = 0; i < newBitmap.length; ++i) {
|
||||||
|
newBitmap[i] = 0;
|
||||||
|
}
|
||||||
System.arraycopy(updated, 0, newBitmap, offset, oldSize);
|
System.arraycopy(updated, 0, newBitmap, offset, oldSize);
|
||||||
updated = newBitmap;
|
updated = newBitmap;
|
||||||
|
|
||||||
newBitmap = new long[deleted.length + delta];
|
newBitmap = new long[deleted.length + delta];
|
||||||
|
for (int i = 0; i < newBitmap.length; ++i) {
|
||||||
|
newBitmap[i] = partial ? 0 : WORD_MASK;
|
||||||
|
}
|
||||||
System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
|
System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
|
||||||
deleted = newBitmap;
|
deleted = newBitmap;
|
||||||
|
|
||||||
for (int i = 0; i < delta; ++i) {
|
|
||||||
updated[oldSize + i] = 0;
|
|
||||||
deleted[oldSize + i] = WORD_MASK;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void merge(final BitSetNode rightNode) {
|
public void merge(final BitSetNode rightNode) {
|
||||||
|
@ -256,7 +260,7 @@ public class ProcedureStoreTracker {
|
||||||
|
|
||||||
for (int i = 0; i < newSize; ++i) {
|
for (int i = 0; i < newSize; ++i) {
|
||||||
updated[offset + i] = 0;
|
updated[offset + i] = 0;
|
||||||
deleted[offset + i] = WORD_MASK;
|
deleted[offset + i] = partial ? 0 : WORD_MASK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.LinkedTransferQueue;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -69,6 +70,12 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
|
private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
|
||||||
private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
|
private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
|
||||||
|
|
||||||
|
private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
|
||||||
|
private static final boolean DEFAULT_USE_HSYNC = true;
|
||||||
|
|
||||||
|
private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
|
||||||
|
private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
|
||||||
|
|
||||||
private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
|
private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
|
||||||
new CopyOnWriteArrayList<ProcedureStoreListener>();
|
new CopyOnWriteArrayList<ProcedureStoreListener>();
|
||||||
|
|
||||||
|
@ -86,14 +93,18 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
private final Path logDir;
|
private final Path logDir;
|
||||||
|
|
||||||
private AtomicBoolean inSync = new AtomicBoolean(false);
|
private AtomicBoolean inSync = new AtomicBoolean(false);
|
||||||
private ArrayBlockingQueue<ByteSlot> slotsCache = null;
|
private LinkedTransferQueue<ByteSlot> slotsCache = null;
|
||||||
private Set<ProcedureWALFile> corruptedLogs = null;
|
private Set<ProcedureWALFile> corruptedLogs = null;
|
||||||
|
private AtomicLong totalSynced = new AtomicLong(0);
|
||||||
private FSDataOutputStream stream = null;
|
private FSDataOutputStream stream = null;
|
||||||
private long totalSynced = 0;
|
private long lastRollTs = 0;
|
||||||
private long flushLogId = 0;
|
private long flushLogId = 0;
|
||||||
private int slotIndex = 0;
|
private int slotIndex = 0;
|
||||||
private Thread syncThread;
|
private Thread syncThread;
|
||||||
private ByteSlot[] slots;
|
private ByteSlot[] slots;
|
||||||
|
|
||||||
|
private long rollThreshold;
|
||||||
|
private boolean useHsync;
|
||||||
private int syncWaitMsec;
|
private int syncWaitMsec;
|
||||||
|
|
||||||
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
|
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
|
||||||
|
@ -112,13 +123,15 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
|
|
||||||
// Init buffer slots
|
// Init buffer slots
|
||||||
slots = new ByteSlot[numSlots];
|
slots = new ByteSlot[numSlots];
|
||||||
slotsCache = new ArrayBlockingQueue(numSlots, true);
|
slotsCache = new LinkedTransferQueue();
|
||||||
while (slotsCache.remainingCapacity() > 0) {
|
while (slotsCache.size() < numSlots) {
|
||||||
slotsCache.offer(new ByteSlot());
|
slotsCache.offer(new ByteSlot());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tunings
|
// Tunings
|
||||||
|
rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
|
||||||
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
|
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
|
||||||
|
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
|
||||||
|
|
||||||
// Init sync thread
|
// Init sync thread
|
||||||
syncThread = new Thread("WALProcedureStoreSyncThread") {
|
syncThread = new Thread("WALProcedureStoreSyncThread") {
|
||||||
|
@ -297,11 +310,9 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
|
|
||||||
// Update the store tracker
|
// Update the store tracker
|
||||||
synchronized (storeTracker) {
|
synchronized (storeTracker) {
|
||||||
if (logId == flushLogId) {
|
|
||||||
storeTracker.insert(proc, subprocs);
|
storeTracker.insert(proc, subprocs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void update(final Procedure proc) {
|
public void update(final Procedure proc) {
|
||||||
|
@ -329,8 +340,8 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
// Update the store tracker
|
// Update the store tracker
|
||||||
boolean removeOldLogs = false;
|
boolean removeOldLogs = false;
|
||||||
synchronized (storeTracker) {
|
synchronized (storeTracker) {
|
||||||
if (logId == flushLogId) {
|
|
||||||
storeTracker.update(proc);
|
storeTracker.update(proc);
|
||||||
|
if (logId == flushLogId) {
|
||||||
removeOldLogs = storeTracker.isUpdated();
|
removeOldLogs = storeTracker.isUpdated();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -365,9 +376,9 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
|
|
||||||
boolean removeOldLogs = false;
|
boolean removeOldLogs = false;
|
||||||
synchronized (storeTracker) {
|
synchronized (storeTracker) {
|
||||||
if (logId == flushLogId) {
|
|
||||||
storeTracker.delete(procId);
|
storeTracker.delete(procId);
|
||||||
if (storeTracker.isEmpty()) {
|
if (logId == flushLogId) {
|
||||||
|
if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
|
||||||
removeOldLogs = rollWriterOrDie(logId + 1);
|
removeOldLogs = rollWriterOrDie(logId + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -416,8 +427,10 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
|
|
||||||
// Notify that the slots are full
|
// Notify that the slots are full
|
||||||
if (slotIndex == slots.length) {
|
if (slotIndex == slots.length) {
|
||||||
|
waitCond.signal();
|
||||||
slotCond.signal();
|
slotCond.signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
syncCond.await();
|
syncCond.await();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -436,7 +449,10 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
// Wait until new data is available
|
// Wait until new data is available
|
||||||
if (slotIndex == 0) {
|
if (slotIndex == 0) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced));
|
float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
|
||||||
|
LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
|
||||||
|
StringUtils.humanSize(totalSynced.get()),
|
||||||
|
StringUtils.humanSize(totalSynced.get() / rollTsSec)));
|
||||||
}
|
}
|
||||||
waitCond.await();
|
waitCond.await();
|
||||||
if (slotIndex == 0) {
|
if (slotIndex == 0) {
|
||||||
|
@ -446,10 +462,22 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
|
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
|
||||||
|
long syncWaitSt = System.currentTimeMillis();
|
||||||
|
if (slotIndex != slots.length) {
|
||||||
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
|
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
|
||||||
|
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
|
||||||
|
float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
|
||||||
|
LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
|
||||||
|
" slotIndex=" + slotIndex +
|
||||||
|
" totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
|
||||||
|
" " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
inSync.set(true);
|
inSync.set(true);
|
||||||
totalSynced += syncSlots();
|
totalSynced.addAndGet(syncSlots());
|
||||||
slotIndex = 0;
|
slotIndex = 0;
|
||||||
inSync.set(false);
|
inSync.set(false);
|
||||||
syncCond.signalAll();
|
syncCond.signalAll();
|
||||||
|
@ -487,7 +515,13 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
data.writeTo(stream);
|
data.writeTo(stream);
|
||||||
totalSynced += data.size();
|
totalSynced += data.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (useHsync) {
|
||||||
stream.hsync();
|
stream.hsync();
|
||||||
|
} else {
|
||||||
|
stream.hflush();
|
||||||
|
}
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Sync slots=" + count + '/' + slots.length +
|
LOG.trace("Sync slots=" + count + '/' + slots.length +
|
||||||
" flushed=" + StringUtils.humanSize(totalSynced));
|
" flushed=" + StringUtils.humanSize(totalSynced));
|
||||||
|
@ -541,7 +575,8 @@ public class WALProcedureStore implements ProcedureStore {
|
||||||
}
|
}
|
||||||
stream = newStream;
|
stream = newStream;
|
||||||
flushLogId = logId;
|
flushLogId = logId;
|
||||||
totalSynced = 0;
|
totalSynced.set(0);
|
||||||
|
lastRollTs = System.currentTimeMillis();
|
||||||
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
|
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
|
@ -27,6 +27,10 @@ public final class StringUtils {
|
||||||
private StringUtils() {}
|
private StringUtils() {}
|
||||||
|
|
||||||
public static String humanTimeDiff(long timeDiff) {
|
public static String humanTimeDiff(long timeDiff) {
|
||||||
|
if (timeDiff < 1000) {
|
||||||
|
return String.format("%dmsec", timeDiff);
|
||||||
|
}
|
||||||
|
|
||||||
StringBuilder buf = new StringBuilder();
|
StringBuilder buf = new StringBuilder();
|
||||||
long hours = timeDiff / (60*60*1000);
|
long hours = timeDiff / (60*60*1000);
|
||||||
long rem = (timeDiff % (60*60*1000));
|
long rem = (timeDiff % (60*60*1000));
|
||||||
|
|
|
@ -210,4 +210,30 @@ public class TestProcedureStoreTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelete() {
|
||||||
|
final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
|
||||||
|
|
||||||
|
long[] procIds = new long[] { 65, 1, 193 };
|
||||||
|
for (int i = 0; i < procIds.length; ++i) {
|
||||||
|
tracker.insert(procIds[i]);
|
||||||
|
tracker.dump();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < (64 * 4); ++i) {
|
||||||
|
boolean hasProc = false;
|
||||||
|
for (int j = 0; j < procIds.length; ++j) {
|
||||||
|
if (procIds[j] == i) {
|
||||||
|
hasProc = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (hasProc) {
|
||||||
|
assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(i));
|
||||||
|
} else {
|
||||||
|
assertEquals("procId=" + i, ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue