HBASE-16554 Rebuild WAL tracker if trailer is corrupted.

Change-Id: Iecc3347de3de9fc57f57ab5f498aad404d02ec52
This commit is contained in:
Apekshit Sharma 2016-09-17 17:38:40 -07:00
parent c5b8aababe
commit b2eac0da33
6 changed files with 178 additions and 44 deletions

View File

@ -93,6 +93,7 @@ public class ProcedureStoreTracker {
private long[] updated;
/**
* Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
* This represents global state since it's not reset on WAL rolls.
*/
private long[] deleted;
/**
@ -449,8 +450,7 @@ public class ProcedureStoreTracker {
}
}
public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf)
throws IOException {
public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
reset();
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
final BitSetNode node = new BitSetNode(protoNode);
@ -536,6 +536,7 @@ public class ProcedureStoreTracker {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
trackProcIds(procId);
}
public void reset() {
@ -545,6 +546,11 @@ public class ProcedureStoreTracker {
resetUpdates();
}
public boolean isUpdated(long procId) {
final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId);
}
/**
* If {@link #partial} is false, returns state from the bitmap. If no state is found for
* {@code procId}, returns YES.
@ -583,6 +589,10 @@ public class ProcedureStoreTracker {
}
}
public boolean isPartial() {
return partial;
}
public void setPartialFlag(boolean isPartial) {
if (this.partial && !isPartial) {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
@ -720,6 +730,7 @@ public class ProcedureStoreTracker {
entry.getValue().dump();
}
}
/**
* Iterates over
* {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other}

View File

@ -62,6 +62,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
this.logFile = logStatus.getPath();
this.logSize = logStatus.getLen();
this.timestamp = logStatus.getModificationTime();
tracker.setPartialFlag(true);
}
public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
@ -72,6 +73,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
this.startPos = startPos;
this.logSize = startPos;
this.timestamp = timestamp;
tracker.setPartialFlag(true);
}
public void open() throws IOException {

View File

@ -25,6 +25,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTr
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ProcedureWALFormat {
private static final Log LOG = LogFactory.getLog(ProcedureWALFormat.class);
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;
static final byte LOG_TYPE_MAX_VALID = 1;
@ -72,19 +76,21 @@ public final class ProcedureWALFormat {
public static void load(final Iterator<ProcedureWALFile> logs,
final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
tracker.setKeepDeletes(true);
try {
// Ignore the last log which is current active log.
while (logs.hasNext()) {
ProcedureWALFile log = logs.next();
log.open();
try {
reader.read(log, loader);
reader.read(log);
} finally {
log.close();
}
}
reader.finalize(loader);
reader.finish();
// The tracker is now updated with all the procedures read from the logs
tracker.setPartialFlag(false);
tracker.resetUpdates();
@ -246,4 +252,4 @@ public final class ProcedureWALFormat {
}
builder.build().writeDelimitedTo(slot);
}
}
}

View File

@ -101,19 +101,40 @@ public class ProcedureWALFormatReader {
private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
//private long compactionLogId;
// private long compactionLogId;
private long maxProcId = 0;
/**
* If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
* re-build the list of procedures updated in that WAL because we need it for log cleaning
* purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
* (see {@link WALProcedureStore#removeInactiveLogs()}).
* However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
* re-building it. (To understand why, take a look at
* {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}).
*/
private ProcedureStoreTracker localTracker;
private final ProcedureWALFormat.Loader loader;
/**
* Global tracker. If set to partial, it will be updated as procedures are loaded from wals,
* otherwise not.
*/
private final ProcedureStoreTracker tracker;
private final boolean hasFastStartSupport;
// private final boolean hasFastStartSupport;
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
ProcedureWALFormat.Loader loader) {
this.tracker = tracker;
this.loader = loader;
// we support fast-start only if we have a clean shutdown.
this.hasFastStartSupport = !tracker.isEmpty();
// this.hasFastStartSupport = !tracker.isEmpty();
}
public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
public void read(final ProcedureWALFile log) throws IOException {
localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
if (localTracker != null) {
LOG.info("Rebuilding tracker for log - " + log);
}
FSDataInputStream stream = log.getStream();
try {
boolean hasMore = true;
@ -121,7 +142,6 @@ public class ProcedureWALFormatReader {
ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
if (entry == null) {
LOG.warn("nothing left to decode. exiting with missing EOF");
hasMore = false;
break;
}
switch (entry.getType()) {
@ -150,9 +170,13 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
if (localTracker != null) {
localTracker.setPartialFlag(false);
}
if (!localProcedureMap.isEmpty()) {
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
procedureMap.mergeTail(localProcedureMap);
//if (hasFastStartSupport) {
// TODO: Some procedure may be already runnables (see readInitEntry())
// (we can also check the "update map" in the log trackers)
@ -164,7 +188,7 @@ public class ProcedureWALFormatReader {
}
}
public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
public void finish() throws IOException {
// notify the loader about the max proc ID
loader.setMaxProcId(maxProcId);
@ -185,7 +209,12 @@ public class ProcedureWALFormatReader {
LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
}
localProcedureMap.add(proc);
tracker.setDeleted(proc.getProcId(), false);
if (tracker.isPartial()) {
tracker.insert(proc.getProcId());
}
}
if (localTracker != null) {
localTracker.insert(proc.getProcId());
}
}
@ -236,7 +265,13 @@ public class ProcedureWALFormatReader {
maxProcId = Math.max(maxProcId, procId);
localProcedureMap.remove(procId);
assert !procedureMap.contains(procId);
tracker.setDeleted(procId, true);
if (tracker.isPartial()) {
tracker.setDeleted(procId, true);
}
if (localTracker != null) {
// In case there is only delete entry for this procedure in current log.
localTracker.setDeleted(procId, true);
}
}
private boolean isDeleted(final long procId) {
@ -264,7 +299,7 @@ public class ProcedureWALFormatReader {
// unlinkFromLinkList = None
// ==========================================================================
private static class Entry {
// hash-table next
// For bucketed linked lists in hash-table.
protected Entry hashNext;
// child head
protected Entry childHead;
@ -511,6 +546,8 @@ public class ProcedureWALFormatReader {
childUnlinkedHead = other.childUnlinkedHead;
}
}
maxProcId = Math.max(maxProcId, other.maxProcId);
minProcId = Math.max(minProcId, other.minProcId);
other.clear();
}

View File

@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
@ -881,24 +880,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private void closeCurrentLogStream() {
if (stream == null) return;
try {
if (stream != null) {
try {
ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
log.updateLocalTracker(storeTracker);
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
log.addToSize(trailerSize);
} catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage());
}
stream.close();
}
ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
log.updateLocalTracker(storeTracker);
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
log.addToSize(trailerSize);
} catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage());
}
try {
stream.close();
} catch (IOException e) {
LOG.error("Unable to close the stream", e);
} finally {
stream = null;
}
stream = null;
}
// ==========================================================================
@ -1058,11 +1055,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
return maxLogId;
}
/**
* If last log's tracker is not null, use it as {@link #storeTracker}.
* Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild
* it using entries in the log.
*/
private void initTrackerFromOldLogs() {
// TODO: Load the most recent tracker available
if (logs.isEmpty()) return;
ProcedureWALFile log = logs.getLast();
if (log.getTracker() != null) {
if (!log.getTracker().isPartial()) {
storeTracker.resetTo(log.getTracker());
} else {
storeTracker.reset();
@ -1074,7 +1075,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
* Loads given log file and it's tracker.
*/
private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) {
LOG.warn("Remove uninitialized log: " + logFile);
log.removeFile();
@ -1095,20 +1096,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
throw new IOException(msg, e);
}
if (log.isCompacted()) {
try {
log.readTrailer();
} catch (IOException e) {
LOG.warn("Unfinished compacted log: " + logFile, e);
log.removeFile();
return null;
}
}
try {
log.readTracker();
} catch (IOException e) {
log.getTracker().reset();
log.getTracker().setPartialFlag(true);
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
}
log.close();
return log;
}
}

View File

@ -360,6 +360,88 @@ public class TestWALProcedureStore {
assertEquals(0, loader.getCorruptedCount());
}
void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs,
int[] updatedProcs, int[] nonUpdatedProcs) {
for (int index : updatedProcs) {
long procId = procs[index].getProcId();
assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
}
for (int index : nonUpdatedProcs) {
long procId = procs[index].getProcId();
assertFalse("Procedure id : " + procId, tracker.isUpdated(procId));
}
}
void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs,
int[] deletedProcs, int[] nonDeletedProcs) {
for (int index : deletedProcs) {
long procId = procs[index].getProcId();
assertEquals("Procedure id : " + procId,
ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId));
}
for (int index : nonDeletedProcs) {
long procId = procs[index].getProcId();
assertEquals("Procedure id : " + procId,
ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId));
}
}
@Test
public void testCorruptedTrailersRebuild() throws Exception {
final Procedure[] procs = new Procedure[6];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestSequentialProcedure();
}
// Log State (I=insert, U=updated, D=delete)
// | log 1 | log 2 | log 3 |
// 0 | I, D | | |
// 1 | I | | |
// 2 | I | D | |
// 3 | I | U | |
// 4 | | I | D |
// 5 | | | I |
procStore.insert(procs[0], null);
procStore.insert(procs[1], null);
procStore.insert(procs[2], null);
procStore.insert(procs[3], null);
procStore.delete(procs[0], null);
procStore.rollWriterForTesting();
procStore.delete(procs[2], null);
procStore.update(procs[3]);
procStore.insert(procs[4], null);
procStore.rollWriterForTesting();
procStore.delete(procs[4], null);
procStore.insert(procs[5], null);
// Stop the store
procStore.stop(false);
// Remove 4 byte from the trailers
final FileStatus[] logs = fs.listStatus(logDir);
assertEquals(3, logs.length);
for (int i = 0; i < logs.length; ++i) {
corruptLog(logs[i], 4);
}
// Restart the store
final LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5
assertEquals(0, loader.getCorruptedCount());
// Check the Trackers
final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
assertEquals(4, walFiles.size());
LOG.info("Checking wal " + walFiles.get(0));
assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
LOG.info("Checking wal " + walFiles.get(1));
assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5});
LOG.info("Checking wal " + walFiles.get(2));
assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3});
LOG.info("Checking global tracker ");
assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5});
}
@Test
public void testCorruptedEntries() throws Exception {
// Insert something