diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 152f1317351..3a878ccada5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -52,6 +52,7 @@ public class ProcedureStoreTracker { private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD; + private final boolean partial; private long[] updated; private long[] deleted; private long start; @@ -88,6 +89,7 @@ public class ProcedureStoreTracker { deleted[i] = partial ? 0 : WORD_MASK; } + this.partial = partial; updateState(procId, false); } @@ -95,6 +97,7 @@ public class ProcedureStoreTracker { this.start = start; this.updated = updated; this.deleted = deleted; + this.partial = false; } public void update(final long procId) { @@ -223,17 +226,18 @@ public class ProcedureStoreTracker { int oldSize = updated.length; newBitmap = new long[oldSize + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = 0; + } System.arraycopy(updated, 0, newBitmap, offset, oldSize); updated = newBitmap; newBitmap = new long[deleted.length + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = partial ? 0 : WORD_MASK; + } System.arraycopy(deleted, 0, newBitmap, offset, oldSize); deleted = newBitmap; - - for (int i = 0; i < delta; ++i) { - updated[oldSize + i] = 0; - deleted[oldSize + i] = WORD_MASK; - } } public void merge(final BitSetNode rightNode) { @@ -256,7 +260,7 @@ public class ProcedureStoreTracker { for (int i = 0; i < newSize; ++i) { updated[offset + i] = 0; - deleted[offset + i] = WORD_MASK; + deleted[offset + i] = partial ? 0 : WORD_MASK; } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 91748fd694d..fe429ea672c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; import java.io.FileNotFoundException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.Arrays; @@ -69,6 +70,12 @@ public class WALProcedureStore implements ProcedureStore { private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; private static final int DEFAULT_SYNC_WAIT_MSEC = 100; + private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; + private static final boolean DEFAULT_USE_HSYNC = true; + + private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; + private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); @@ -86,14 +93,18 @@ public class WALProcedureStore implements ProcedureStore { private final Path logDir; private AtomicBoolean inSync = new AtomicBoolean(false); - private ArrayBlockingQueue slotsCache = null; + private LinkedTransferQueue slotsCache = null; private Set corruptedLogs = null; + private AtomicLong totalSynced = new AtomicLong(0); private FSDataOutputStream stream = null; - private long totalSynced = 0; + private long lastRollTs = 0; private long flushLogId = 0; private int slotIndex = 0; private Thread syncThread; private ByteSlot[] slots; + + private long rollThreshold; + private boolean useHsync; private int syncWaitMsec; public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, @@ -112,13 +123,15 @@ public class WALProcedureStore implements ProcedureStore { // Init buffer slots slots = new ByteSlot[numSlots]; - slotsCache = new ArrayBlockingQueue(numSlots, true); - while (slotsCache.remainingCapacity() > 0) { + slotsCache = new LinkedTransferQueue(); + while (slotsCache.size() < numSlots) { slotsCache.offer(new ByteSlot()); } // Tunings + rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); + useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); // Init sync thread syncThread = new Thread("WALProcedureStoreSyncThread") { @@ -297,9 +310,7 @@ public class WALProcedureStore implements ProcedureStore { // Update the store tracker synchronized (storeTracker) { - if (logId == flushLogId) { - storeTracker.insert(proc, subprocs); - } + storeTracker.insert(proc, subprocs); } } @@ -329,8 +340,8 @@ public class WALProcedureStore implements ProcedureStore { // Update the store tracker boolean removeOldLogs = false; synchronized (storeTracker) { + storeTracker.update(proc); if (logId == flushLogId) { - storeTracker.update(proc); removeOldLogs = storeTracker.isUpdated(); } } @@ -365,9 +376,9 @@ public class WALProcedureStore implements ProcedureStore { boolean removeOldLogs = false; synchronized (storeTracker) { + storeTracker.delete(procId); if (logId == flushLogId) { - storeTracker.delete(procId); - if (storeTracker.isEmpty()) { + if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) { removeOldLogs = rollWriterOrDie(logId + 1); } } @@ -416,8 +427,10 @@ public class WALProcedureStore implements ProcedureStore { // Notify that the slots are full if (slotIndex == slots.length) { + waitCond.signal(); slotCond.signal(); } + syncCond.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -436,7 +449,10 @@ public class WALProcedureStore implements ProcedureStore { // Wait until new data is available if (slotIndex == 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced)); + float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; + LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", + StringUtils.humanSize(totalSynced.get()), + StringUtils.humanSize(totalSynced.get() / rollTsSec))); } waitCond.await(); if (slotIndex == 0) { @@ -446,10 +462,22 @@ public class WALProcedureStore implements ProcedureStore { } // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing - slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + long syncWaitSt = System.currentTimeMillis(); + if (slotIndex != slots.length) { + slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + } + long syncWaitMs = System.currentTimeMillis() - syncWaitSt; + if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { + float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; + LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) + + " slotIndex=" + slotIndex + + " totalSynced=" + StringUtils.humanSize(totalSynced.get()) + + " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec"); + } + inSync.set(true); - totalSynced += syncSlots(); + totalSynced.addAndGet(syncSlots()); slotIndex = 0; inSync.set(false); syncCond.signalAll(); @@ -487,7 +515,13 @@ public class WALProcedureStore implements ProcedureStore { data.writeTo(stream); totalSynced += data.size(); } - stream.hsync(); + + if (useHsync) { + stream.hsync(); + } else { + stream.hflush(); + } + if (LOG.isTraceEnabled()) { LOG.trace("Sync slots=" + count + '/' + slots.length + " flushed=" + StringUtils.humanSize(totalSynced)); @@ -541,7 +575,8 @@ public class WALProcedureStore implements ProcedureStore { } stream = newStream; flushLogId = logId; - totalSynced = 0; + totalSynced.set(0); + lastRollTs = System.currentTimeMillis(); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); } finally { lock.unlock(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java index 97134c23e43..a2c4713ad45 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java @@ -27,6 +27,10 @@ public final class StringUtils { private StringUtils() {} public static String humanTimeDiff(long timeDiff) { + if (timeDiff < 1000) { + return String.format("%dmsec", timeDiff); + } + StringBuilder buf = new StringBuilder(); long hours = timeDiff / (60*60*1000); long rem = (timeDiff % (60*60*1000)); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 9475e67812f..424d9cec05f 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -210,4 +210,30 @@ public class TestProcedureStoreTracker { } } } + + @Test + public void testDelete() { + final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + + long[] procIds = new long[] { 65, 1, 193 }; + for (int i = 0; i < procIds.length; ++i) { + tracker.insert(procIds[i]); + tracker.dump(); + } + + for (int i = 0; i < (64 * 4); ++i) { + boolean hasProc = false; + for (int j = 0; j < procIds.length; ++j) { + if (procIds[j] == i) { + hasProc = true; + break; + } + } + if (hasProc) { + assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(i)); + } else { + assertEquals("procId=" + i, ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(i)); + } + } + } }