HBASE-14712 Increase MasterProcWALs clean up granularity

This commit is contained in:
Matteo Bertozzi 2015-11-09 09:33:05 -08:00
parent 0af651bde5
commit 7b80c803b7
6 changed files with 255 additions and 22 deletions

View File

@ -44,13 +44,16 @@ public class ProcedureStoreTracker {
private boolean keepDeletes = false;
private boolean partial = false;
private long minUpdatedProcId = Long.MAX_VALUE;
private long maxUpdatedProcId = Long.MIN_VALUE;
public enum DeleteState { YES, NO, MAYBE }
public static class BitSetNode {
private final static long WORD_MASK = 0xffffffffffffffffL;
private final static int ADDRESS_BITS_PER_WORD = 6;
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
private final boolean partial;
private long[] updated;
@ -81,7 +84,7 @@ public class ProcedureStoreTracker {
public BitSetNode(final long procId, final boolean partial) {
start = alignDown(procId);
int count = 2;
int count = 1;
updated = new long[count];
deleted = new long[count];
for (int i = 0; i < count; ++i) {
@ -141,8 +144,7 @@ public class ProcedureStoreTracker {
public boolean isUpdated() {
// TODO: cache the value
for (int i = 0; i < updated.length; ++i) {
long deleteMask = ~deleted[i];
if ((updated[i] & deleteMask) != (WORD_MASK & deleteMask)) {
if ((updated[i] | deleted[i]) != WORD_MASK) {
return false;
}
}
@ -171,6 +173,16 @@ public class ProcedureStoreTracker {
}
}
public void unsetPartialFlag() {
for (int i = 0; i < updated.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
if ((updated[i] & (1L << j)) == 0) {
deleted[i] |= (1L << j);
}
}
}
}
public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
@ -360,6 +372,7 @@ public class ProcedureStoreTracker {
public void insert(long procId) {
BitSetNode node = getOrCreateNode(procId);
node.update(procId);
trackProcIds(procId);
}
public void update(long procId) {
@ -369,6 +382,7 @@ public class ProcedureStoreTracker {
BitSetNode node = entry.getValue();
assert node.contains(procId);
node.update(procId);
trackProcIds(procId);
}
public void delete(long procId) {
@ -383,6 +397,21 @@ public class ProcedureStoreTracker {
// TODO: RESET if (map.size() == 1)
map.remove(entry.getKey());
}
trackProcIds(procId);
}
private void trackProcIds(long procId) {
minUpdatedProcId = Math.min(minUpdatedProcId, procId);
maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
}
public long getUpdatedMinProcId() {
return minUpdatedProcId;
}
public long getUpdatedMaxProcId() {
return maxUpdatedProcId;
}
@InterfaceAudience.Private
@ -394,11 +423,12 @@ public class ProcedureStoreTracker {
public void clear() {
this.map.clear();
resetUpdates();
}
public DeleteState isDeleted(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
if (entry != null) {
if (entry != null && entry.getValue().contains(procId)) {
BitSetNode node = entry.getValue();
DeleteState state = node.isDeleted(procId);
return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
@ -426,6 +456,11 @@ public class ProcedureStoreTracker {
}
public void setPartialFlag(boolean isPartial) {
if (this.partial && !isPartial) {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().unsetPartialFlag();
}
}
this.partial = isPartial;
}
@ -447,10 +482,17 @@ public class ProcedureStoreTracker {
return true;
}
public boolean isTracking(long minId, long maxId) {
// TODO: we can make it more precise, instead of looking just at the block
return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
}
public void resetUpdates() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().resetUpdates();
}
minUpdatedProcId = Long.MAX_VALUE;
maxUpdatedProcId = Long.MIN_VALUE;
}
public void undeleteAll() {
@ -527,6 +569,8 @@ public class ProcedureStoreTracker {
public void dump() {
System.out.println("map " + map.size());
System.out.println("isUpdated " + isUpdated());
System.out.println("isEmpty " + isEmpty());
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().dump();
}
@ -550,4 +594,4 @@ public class ProcedureStoreTracker {
map.put(node.getStart(), node);
}
}
}
}

View File

@ -46,6 +46,8 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
private FileSystem fs;
private Path logFile;
private long startPos;
private long minProcId;
private long maxProcId;
public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
this.fs = fs;
@ -127,6 +129,19 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
fs.delete(logFile, false);
}
public void setProcIds(long minId, long maxId) {
this.minProcId = minId;
this.maxProcId = maxId;
}
public long getMinProcId() {
return minProcId;
}
public long getMaxProcId() {
return maxProcId;
}
@Override
public int compareTo(final ProcedureWALFile other) {
long diff = header.getLogId() - other.header.getLogId();

View File

@ -150,8 +150,8 @@ public class ProcedureWALFormatReader {
LOG.info("No active entry found in state log " + log + ". removing it");
loader.removeLog(log);
} else {
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
procedureMap.mergeTail(localProcedureMap);
//if (hasFastStartSupport) {
// TODO: Some procedure may be already runnables (see readInitEntry())
// (we can also check the "update map" in the log trackers)
@ -321,6 +321,10 @@ public class ProcedureWALFormatReader {
// pending unlinked children (root not present yet)
private Entry childUnlinkedHead;
// Track ProcId range
private long minProcId = Long.MAX_VALUE;
private long maxProcId = Long.MIN_VALUE;
public WalProcedureMap(int size) {
procedureMap = new Entry[size];
replayOrderHead = null;
@ -330,6 +334,7 @@ public class ProcedureWALFormatReader {
}
public void add(ProcedureProtos.Procedure procProto) {
trackProcIds(procProto.getProcId());
Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
boolean isNew = entry.proto == null;
entry.proto = procProto;
@ -345,6 +350,7 @@ public class ProcedureWALFormatReader {
}
public boolean remove(long procId) {
trackProcIds(procId);
Entry entry = removeFromMap(procId);
if (entry != null) {
unlinkFromReplayList(entry);
@ -354,6 +360,19 @@ public class ProcedureWALFormatReader {
return false;
}
private void trackProcIds(long procId) {
minProcId = Math.min(minProcId, procId);
maxProcId = Math.max(maxProcId, procId);
}
public long getMinProcId() {
return minProcId;
}
public long getMaxProcId() {
return maxProcId;
}
public boolean contains(long procId) {
return getProcedure(procId) != null;
}
@ -370,6 +389,8 @@ public class ProcedureWALFormatReader {
replayOrderTail = null;
rootHead = null;
childUnlinkedHead = null;
minProcId = Long.MAX_VALUE;
maxProcId = Long.MIN_VALUE;
}
/*

View File

@ -100,6 +100,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
private final AtomicLong inactiveLogsMaxId = new AtomicLong(0);
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final Condition slotCond = lock.newCondition();
@ -225,6 +226,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
return storeTracker;
}
public LinkedList<ProcedureWALFile> getActiveLogs() {
return logs;
}
public Set<ProcedureWALFile> getCorruptedLogs() {
return corruptedLogs;
}
@Override
public void recoverLease() throws IOException {
LOG.info("Starting WAL Procedure Store lease recovery");
@ -386,7 +395,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
if (removeOldLogs) {
removeAllLogs(logId - 1);
setInactiveLogsMaxId(logId - 1);
}
}
@ -426,7 +435,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
if (removeOldLogs) {
removeAllLogs(logId);
setInactiveLogsMaxId(logId);
}
}
@ -499,6 +508,18 @@ public class WALProcedureStore extends ProcedureStoreBase {
return syncException.get() != null;
}
protected void periodicRoll() throws IOException {
long logId;
boolean removeOldLogs;
synchronized (storeTracker) {
logId = flushLogId;
removeOldLogs = storeTracker.isEmpty();
}
if (checkAndTryRoll() && removeOldLogs) {
setInactiveLogsMaxId(logId);
}
}
private void syncLoop() throws Throwable {
inSync.set(false);
lock.lock();
@ -507,6 +528,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
// Wait until new data is available
if (slotIndex == 0) {
removeInactiveLogs();
if (LOG.isTraceEnabled()) {
float rollTsSec = getMillisFromLastRoll() / 1000.0f;
LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
@ -516,8 +539,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
if (slotIndex == 0) {
// no data.. probably a stop()
checkAndTryRoll();
// no data.. probably a stop() or a periodic roll
periodicRoll();
continue;
}
}
@ -724,7 +747,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
if (stream != null) {
try {
ProcedureWALFormat.writeTrailer(stream, storeTracker);
synchronized (storeTracker) {
ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
ProcedureWALFormat.writeTrailer(stream, storeTracker);
}
} catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage());
}
@ -737,21 +764,51 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
private void removeAllLogs(long lastLogId) {
if (logs.size() <= 1) {
assert logs.size() == 1: "Expected at least one active log to be running.";
return;
// ==========================================================================
// Log Files cleaner helpers
// ==========================================================================
private void setInactiveLogsMaxId(long logId) {
long expect = 0;
while (!inactiveLogsMaxId.compareAndSet(expect, logId)) {
expect = inactiveLogsMaxId.get();
if (expect >= logId) {
break;
}
}
}
private void removeInactiveLogs() {
long lastLogId = inactiveLogsMaxId.get();
if (lastLogId != 0) {
removeAllLogs(lastLogId);
inactiveLogsMaxId.compareAndSet(lastLogId, 0);
}
// Verify if the ProcId of the first oldest is still active. if not remove the file.
while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst();
synchronized (storeTracker) {
if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
break;
}
}
removeLogFile(log);
}
}
private void removeAllLogs(long lastLogId) {
if (logs.size() <= 1) return;
if (LOG.isDebugEnabled()) {
LOG.debug("Remove all state logs with ID less than " + lastLogId);
}
do {
while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst();
if (lastLogId < log.getLogId()) {
break;
}
removeLogFile(log);
} while(!logs.isEmpty());
}
}
private boolean removeLogFile(final ProcedureWALFile log) {
@ -761,6 +818,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
log.removeFile();
logs.remove(log);
LOG.info("Remove log: " + log);
LOG.info("Removed logs: " + logs);
if (logs.size() == 0) { LOG.error("Expected at least one log"); }
assert logs.size() > 0 : "expected at least one log";
} catch (IOException e) {
LOG.error("Unable to remove log: " + log, e);
return false;
@ -768,10 +829,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
return true;
}
public Set<ProcedureWALFile> getCorruptedLogs() {
return corruptedLogs;
}
// ==========================================================================
// FileSystem Log Files helpers
// ==========================================================================

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2.store;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -124,6 +125,36 @@ public class TestProcedureStoreTracker {
assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579));
assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(577));
assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(580));
tracker.setDeleted(579, true);
tracker.setPartialFlag(false);
assertTrue(tracker.isEmpty());
}
@Test
public void testIsTracking() {
long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}};
long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}};
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
for (int i = 0; i < procIds.length; ++i) {
long[] seq = procIds[i];
tracker.insert(seq[0]);
tracker.insert(seq[1]);
}
for (int i = 0; i < procIds.length; ++i) {
long[] check = checkIds[i];
long[] seq = procIds[i];
assertTrue(tracker.isTracking(seq[0], seq[1]));
assertTrue(tracker.isTracking(check[0], check[1]));
tracker.delete(seq[0]);
tracker.delete(seq[1]);
assertFalse(tracker.isTracking(seq[0], seq[1]));
assertFalse(tracker.isTracking(check[0], check[1]));
}
assertTrue(tracker.isEmpty());
}
@Test

View File

@ -27,6 +27,8 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -96,6 +98,15 @@ public class TestWALProcedureStore {
procStore.load(loader);
}
@Test
public void testEmptyRoll() throws Exception {
for (int i = 0; i < 10; ++i) {
procStore.periodicRoll();
}
FileStatus[] status = fs.listStatus(logDir);
assertEquals(1, status.length);
}
@Test
public void testEmptyLogLoad() throws Exception {
LoadCounter loader = new LoadCounter();
@ -354,6 +365,60 @@ public class TestWALProcedureStore {
});
}
@Test
public void testInsertUpdateDelete() throws Exception {
final int NTHREAD = 2;
procStore.stop(false);
fs.delete(logDir, true);
org.apache.hadoop.conf.Configuration conf =
new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
fs.mkdirs(logDir);
procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
procStore.start(NTHREAD);
procStore.recoverLease();
final long LAST_PROC_ID = 9999;
final Thread[] thread = new Thread[NTHREAD];
final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
for (int i = 0; i < thread.length; ++i) {
thread[i] = new Thread() {
@Override
public void run() {
Random rand = new Random();
TestProcedure proc;
do {
proc = new TestProcedure(procCounter.addAndGet(1));
// Insert
procStore.insert(proc, null);
// Update
for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {}
procStore.update(proc);
}
// Delete
procStore.delete(proc.getProcId());
} while (proc.getProcId() < LAST_PROC_ID);
}
};
thread[i].start();
}
for (int i = 0; i < thread.length; ++i) {
thread[i].join();
}
procStore.getStoreTracker().dump();
assertTrue(procCounter.get() >= LAST_PROC_ID);
assertTrue(procStore.getStoreTracker().isEmpty());
assertEquals(1, procStore.getActiveLogs().size());
}
private void corruptLog(final FileStatus logFile, final long dropBytes)
throws IOException {
assertTrue(logFile.getLen() > dropBytes);