HBASE-13529 Procedure v2 - WAL Improvements

This commit is contained in:
Matteo Bertozzi 2015-04-22 21:23:52 +01:00
parent 031dd3638c
commit ad2b1d8467
4 changed files with 91 additions and 22 deletions

View File

@ -52,6 +52,7 @@ public class ProcedureStoreTracker {
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD; private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
private final boolean partial;
private long[] updated; private long[] updated;
private long[] deleted; private long[] deleted;
private long start; private long start;
@ -88,6 +89,7 @@ public class ProcedureStoreTracker {
deleted[i] = partial ? 0 : WORD_MASK; deleted[i] = partial ? 0 : WORD_MASK;
} }
this.partial = partial;
updateState(procId, false); updateState(procId, false);
} }
@ -95,6 +97,7 @@ public class ProcedureStoreTracker {
this.start = start; this.start = start;
this.updated = updated; this.updated = updated;
this.deleted = deleted; this.deleted = deleted;
this.partial = false;
} }
public void update(final long procId) { public void update(final long procId) {
@ -223,17 +226,18 @@ public class ProcedureStoreTracker {
int oldSize = updated.length; int oldSize = updated.length;
newBitmap = new long[oldSize + delta]; newBitmap = new long[oldSize + delta];
for (int i = 0; i < newBitmap.length; ++i) {
newBitmap[i] = 0;
}
System.arraycopy(updated, 0, newBitmap, offset, oldSize); System.arraycopy(updated, 0, newBitmap, offset, oldSize);
updated = newBitmap; updated = newBitmap;
newBitmap = new long[deleted.length + delta]; newBitmap = new long[deleted.length + delta];
for (int i = 0; i < newBitmap.length; ++i) {
newBitmap[i] = partial ? 0 : WORD_MASK;
}
System.arraycopy(deleted, 0, newBitmap, offset, oldSize); System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
deleted = newBitmap; deleted = newBitmap;
for (int i = 0; i < delta; ++i) {
updated[oldSize + i] = 0;
deleted[oldSize + i] = WORD_MASK;
}
} }
public void merge(final BitSetNode rightNode) { public void merge(final BitSetNode rightNode) {
@ -256,7 +260,7 @@ public class ProcedureStoreTracker {
for (int i = 0; i < newSize; ++i) { for (int i = 0; i < newSize; ++i) {
updated[offset + i] = 0; updated[offset + i] = 0;
deleted[offset + i] = WORD_MASK; deleted[offset + i] = partial ? 0 : WORD_MASK;
} }
} }

View File

@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException; import java.io.IOException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.Arrays; import java.util.Arrays;
@ -69,6 +70,12 @@ public class WALProcedureStore implements ProcedureStore {
private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
private static final int DEFAULT_SYNC_WAIT_MSEC = 100; private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
private static final boolean DEFAULT_USE_HSYNC = true;
private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
private final CopyOnWriteArrayList<ProcedureStoreListener> listeners = private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
new CopyOnWriteArrayList<ProcedureStoreListener>(); new CopyOnWriteArrayList<ProcedureStoreListener>();
@ -86,14 +93,18 @@ public class WALProcedureStore implements ProcedureStore {
private final Path logDir; private final Path logDir;
private AtomicBoolean inSync = new AtomicBoolean(false); private AtomicBoolean inSync = new AtomicBoolean(false);
private ArrayBlockingQueue<ByteSlot> slotsCache = null; private LinkedTransferQueue<ByteSlot> slotsCache = null;
private Set<ProcedureWALFile> corruptedLogs = null; private Set<ProcedureWALFile> corruptedLogs = null;
private AtomicLong totalSynced = new AtomicLong(0);
private FSDataOutputStream stream = null; private FSDataOutputStream stream = null;
private long totalSynced = 0; private long lastRollTs = 0;
private long flushLogId = 0; private long flushLogId = 0;
private int slotIndex = 0; private int slotIndex = 0;
private Thread syncThread; private Thread syncThread;
private ByteSlot[] slots; private ByteSlot[] slots;
private long rollThreshold;
private boolean useHsync;
private int syncWaitMsec; private int syncWaitMsec;
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
@ -112,13 +123,15 @@ public class WALProcedureStore implements ProcedureStore {
// Init buffer slots // Init buffer slots
slots = new ByteSlot[numSlots]; slots = new ByteSlot[numSlots];
slotsCache = new ArrayBlockingQueue(numSlots, true); slotsCache = new LinkedTransferQueue();
while (slotsCache.remainingCapacity() > 0) { while (slotsCache.size() < numSlots) {
slotsCache.offer(new ByteSlot()); slotsCache.offer(new ByteSlot());
} }
// Tunings // Tunings
rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
// Init sync thread // Init sync thread
syncThread = new Thread("WALProcedureStoreSyncThread") { syncThread = new Thread("WALProcedureStoreSyncThread") {
@ -297,11 +310,9 @@ public class WALProcedureStore implements ProcedureStore {
// Update the store tracker // Update the store tracker
synchronized (storeTracker) { synchronized (storeTracker) {
if (logId == flushLogId) {
storeTracker.insert(proc, subprocs); storeTracker.insert(proc, subprocs);
} }
} }
}
@Override @Override
public void update(final Procedure proc) { public void update(final Procedure proc) {
@ -329,8 +340,8 @@ public class WALProcedureStore implements ProcedureStore {
// Update the store tracker // Update the store tracker
boolean removeOldLogs = false; boolean removeOldLogs = false;
synchronized (storeTracker) { synchronized (storeTracker) {
if (logId == flushLogId) {
storeTracker.update(proc); storeTracker.update(proc);
if (logId == flushLogId) {
removeOldLogs = storeTracker.isUpdated(); removeOldLogs = storeTracker.isUpdated();
} }
} }
@ -365,9 +376,9 @@ public class WALProcedureStore implements ProcedureStore {
boolean removeOldLogs = false; boolean removeOldLogs = false;
synchronized (storeTracker) { synchronized (storeTracker) {
if (logId == flushLogId) {
storeTracker.delete(procId); storeTracker.delete(procId);
if (storeTracker.isEmpty()) { if (logId == flushLogId) {
if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
removeOldLogs = rollWriterOrDie(logId + 1); removeOldLogs = rollWriterOrDie(logId + 1);
} }
} }
@ -416,8 +427,10 @@ public class WALProcedureStore implements ProcedureStore {
// Notify that the slots are full // Notify that the slots are full
if (slotIndex == slots.length) { if (slotIndex == slots.length) {
waitCond.signal();
slotCond.signal(); slotCond.signal();
} }
syncCond.await(); syncCond.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -436,7 +449,10 @@ public class WALProcedureStore implements ProcedureStore {
// Wait until new data is available // Wait until new data is available
if (slotIndex == 0) { if (slotIndex == 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced)); float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollTsSec)));
} }
waitCond.await(); waitCond.await();
if (slotIndex == 0) { if (slotIndex == 0) {
@ -446,10 +462,22 @@ public class WALProcedureStore implements ProcedureStore {
} }
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
long syncWaitSt = System.currentTimeMillis();
if (slotIndex != slots.length) {
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
}
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
" slotIndex=" + slotIndex +
" totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
" " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
}
inSync.set(true); inSync.set(true);
totalSynced += syncSlots(); totalSynced.addAndGet(syncSlots());
slotIndex = 0; slotIndex = 0;
inSync.set(false); inSync.set(false);
syncCond.signalAll(); syncCond.signalAll();
@ -487,7 +515,13 @@ public class WALProcedureStore implements ProcedureStore {
data.writeTo(stream); data.writeTo(stream);
totalSynced += data.size(); totalSynced += data.size();
} }
if (useHsync) {
stream.hsync(); stream.hsync();
} else {
stream.hflush();
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Sync slots=" + count + '/' + slots.length + LOG.trace("Sync slots=" + count + '/' + slots.length +
" flushed=" + StringUtils.humanSize(totalSynced)); " flushed=" + StringUtils.humanSize(totalSynced));
@ -541,7 +575,8 @@ public class WALProcedureStore implements ProcedureStore {
} }
stream = newStream; stream = newStream;
flushLogId = logId; flushLogId = logId;
totalSynced = 0; totalSynced.set(0);
lastRollTs = System.currentTimeMillis();
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
} finally { } finally {
lock.unlock(); lock.unlock();

View File

@ -27,6 +27,10 @@ public final class StringUtils {
private StringUtils() {} private StringUtils() {}
public static String humanTimeDiff(long timeDiff) { public static String humanTimeDiff(long timeDiff) {
if (timeDiff < 1000) {
return String.format("%dmsec", timeDiff);
}
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
long hours = timeDiff / (60*60*1000); long hours = timeDiff / (60*60*1000);
long rem = (timeDiff % (60*60*1000)); long rem = (timeDiff % (60*60*1000));

View File

@ -211,4 +211,30 @@ public class TestProcedureStoreTracker {
} }
} }
} }
@Test
public void testDelete() {
final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
long[] procIds = new long[] { 65, 1, 193 };
for (int i = 0; i < procIds.length; ++i) {
tracker.insert(procIds[i]);
tracker.dump();
}
for (int i = 0; i < (64 * 4); ++i) {
boolean hasProc = false;
for (int j = 0; j < procIds.length; ++j) {
if (procIds[j] == i) {
hasProc = true;
break;
}
}
if (hasProc) {
assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(i));
} else {
assertEquals("procId=" + i, ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(i));
}
}
}
} }