HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed

This commit is contained in:
Matteo Bertozzi 2016-01-22 15:57:12 -08:00
parent 4681827d63
commit 772f30fe2a
10 changed files with 201 additions and 92 deletions

View File

@ -41,12 +41,12 @@ public class ProcedureInfo implements Cloneable {
private final String procOwner;
private final ProcedureState procState;
private final long parentId;
private final NonceKey nonceKey;
private final ForeignExceptionMessage exception;
private final long lastUpdate;
private final long startTime;
private final byte[] result;
private NonceKey nonceKey = null;
private long clientAckTime = -1;
public ProcedureInfo(
@ -55,6 +55,7 @@ public class ProcedureInfo implements Cloneable {
final String procOwner,
final ProcedureState procState,
final long parentId,
final NonceKey nonceKey,
final ForeignExceptionMessage exception,
final long lastUpdate,
final long startTime,
@ -64,6 +65,7 @@ public class ProcedureInfo implements Cloneable {
this.procOwner = procOwner;
this.procState = procState;
this.parentId = parentId;
this.nonceKey = nonceKey;
this.lastUpdate = lastUpdate;
this.startTime = startTime;
@ -75,8 +77,8 @@ public class ProcedureInfo implements Cloneable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
justification="Intentional; calling super class clone doesn't make sense here.")
public ProcedureInfo clone() {
return new ProcedureInfo(
procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime, result);
return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey,
exception, lastUpdate, startTime, result);
}
public long getProcId() {
@ -107,10 +109,6 @@ public class ProcedureInfo implements Cloneable {
return nonceKey;
}
public void setNonceKey(NonceKey nonceKey) {
this.nonceKey = nonceKey;
}
public boolean isFailed() {
return exception != null;
}
@ -218,12 +216,18 @@ public class ProcedureInfo implements Cloneable {
*/
@InterfaceAudience.Private
public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
NonceKey nonceKey = null;
if (procProto.getNonce() != HConstants.NO_NONCE) {
nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
}
return new ProcedureInfo(
procProto.getProcId(),
procProto.getClassName(),
procProto.getOwner(),
procProto.getState(),
procProto.hasParentId() ? procProto.getParentId() : -1,
nonceKey,
procProto.hasException() ? procProto.getException() : null,
procProto.getLastUpdate(),
procProto.getStartTime(),

View File

@ -640,30 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
*/
@InterfaceAudience.Private
public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
RemoteProcedureException exception;
if (proc.hasException()) {
exception = proc.getException();
} else {
exception = null;
}
ProcedureInfo procInfo = new ProcedureInfo(
RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
return new ProcedureInfo(
proc.getProcId(),
proc.toStringClass(),
proc.getOwner(),
proc.getState(),
proc.hasParent() ? proc.getParentProcId() : -1,
nonceKey,
exception != null ?
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
proc.getLastUpdate(),
proc.getStartTime(),
proc.getResult());
if (nonceKey != null) {
procInfo.setNonceKey(nonceKey);
}
return procInfo;
}
/**

View File

@ -308,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> {
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
int corruptedCount = 0;
while (procIter.hasNext()) {
Procedure proc = procIter.next();
ProcedureInfo proc = procIter.nextAsProcedureInfo();
LOG.error("corrupted procedure: " + proc);
corruptedCount++;
}
@ -321,34 +321,61 @@ public class ProcedureExecutor<TEnvironment> {
private void loadProcedures(final ProcedureIterator procIter,
final boolean abortOnCorruption) throws IOException {
final boolean isDebugEnabled = LOG.isDebugEnabled();
// 1. Build the rollback stack
int runnablesCount = 0;
while (procIter.hasNext()) {
Procedure proc = procIter.next();
if (!proc.hasParent() && !proc.isFinished()) {
final NonceKey nonceKey;
final long procId;
if (procIter.isNextCompleted()) {
ProcedureInfo proc = procIter.nextAsProcedureInfo();
nonceKey = proc.getNonceKey();
procId = proc.getProcId();
completed.put(proc.getProcId(), proc);
if (isDebugEnabled) {
LOG.debug("The procedure is completed: " + proc);
}
} else {
Procedure proc = procIter.nextAsProcedure();
nonceKey = proc.getNonceKey();
procId = proc.getProcId();
if (!proc.hasParent()) {
assert !proc.isFinished() : "unexpected finished procedure";
rollbackStack.put(proc.getProcId(), new RootProcedureState());
}
// add the procedure to the map
proc.beforeReplay(getEnvironment());
procedures.put(proc.getProcId(), proc);
// add the nonce to the map
if (proc.getNonceKey() != null) {
nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
}
if (proc.getState() == ProcedureState.RUNNABLE) {
runnablesCount++;
}
}
// add the nonce to the map
if (nonceKey != null) {
nonceKeysToProcIdsMap.put(nonceKey, procId);
}
}
// 2. Initialize the stacks
ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
HashSet<Procedure> waitingSet = null;
procIter.reset();
while (procIter.hasNext()) {
Procedure proc = procIter.next();
if (LOG.isDebugEnabled()) {
if (procIter.isNextCompleted()) {
procIter.skipNext();
continue;
}
Procedure proc = procIter.nextAsProcedure();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
if (isDebugEnabled) {
LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
proc.getState(), proc.hasException(), proc));
}
@ -360,18 +387,6 @@ public class ProcedureExecutor<TEnvironment> {
continue;
}
if (!proc.hasParent() && proc.isFinished()) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("The procedure is completed state=%s isFailed=%s",
proc.getState(), proc.hasException()));
}
assert !rollbackStack.containsKey(proc.getProcId());
procedures.remove(proc.getProcId());
completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
continue;
}
if (proc.hasParent() && !proc.isFinished()) {
Procedure parent = procedures.get(proc.getParentProcId());
// corrupted procedures are handled later at step 3
@ -850,13 +865,15 @@ public class ProcedureExecutor<TEnvironment> {
break;
}
if (proc.getProcId() == rootProcId && proc.isSuccess()) {
// Finalize the procedure state
if (proc.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Procedure completed in " +
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
}
// Finalize the procedure state
if (proc.getProcId() == rootProcId) {
procedureFinished(proc);
}
break;
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure;
/**
@ -66,12 +67,27 @@ public interface ProcedureStore {
*/
boolean hasNext();
/**
* @return true if the iterator next element is a completed procedure.
*/
boolean isNextCompleted();
/**
* Skip the next procedure
*/
void skipNext();
/**
* Returns the next procedure in the iteration.
* @throws IOException if there was an error fetching/deserializing the procedure
* @return the next procedure in the iteration.
*/
Procedure next() throws IOException;
Procedure nextAsProcedure() throws IOException;
/**
* @return the next procedure in the iteration as ProcedureInfo.
*/
ProcedureInfo nextAsProcedureInfo();
}
/**

View File

@ -414,7 +414,9 @@ public class ProcedureStoreTracker {
node.updateState(procId, isDeleted);
}
public void clear() {
public void reset() {
this.keepDeletes = false;
this.partial = false;
this.map.clear();
resetUpdates();
}
@ -579,11 +581,11 @@ public class ProcedureStoreTracker {
}
public void readFrom(final InputStream stream) throws IOException {
ProcedureProtos.ProcedureStoreTracker data =
reset();
final ProcedureProtos.ProcedureStoreTracker data =
ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
map.clear();
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
BitSetNode node = BitSetNode.convert(protoNode);
final BitSetNode node = BitSetNode.convert(protoNode);
map.put(node.getStart(), node);
}
}

View File

@ -65,7 +65,6 @@ public final class ProcedureWALFormat {
}
interface Loader extends ProcedureLoader {
void removeLog(ProcedureWALFile log);
void markCorruptedWAL(ProcedureWALFile log, IOException e);
}

View File

@ -22,9 +22,10 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@ -79,7 +80,7 @@ public class ProcedureWALFormatReader {
//
// Fast Start: INIT/INSERT record and StackIDs
// ---------------------------------------------
// We have to special record, INIT and INSERT that tracks the first time
// We have two special record, INIT and INSERT that tracks the first time
// the procedure was added to the WAL. We can use that information to be able
// to start procedures before reaching the end of the WAL, or before reading all the WALs.
// but in some cases the WAL with that record can be already gone.
@ -146,10 +147,7 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
if (localProcedureMap.isEmpty()) {
LOG.info("No active entry found in state log " + log + ". removing it");
loader.removeLog(log);
} else {
if (!localProcedureMap.isEmpty()) {
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
procedureMap.mergeTail(localProcedureMap);
//if (hasFastStartSupport) {
@ -215,6 +213,7 @@ public class ProcedureWALFormatReader {
}
maxProcId = Math.max(maxProcId, entry.getProcId());
localProcedureMap.remove(entry.getProcId());
assert !procedureMap.contains(entry.getProcId());
tracker.setDeleted(entry.getProcId(), true);
}
@ -265,6 +264,20 @@ public class ProcedureWALFormatReader {
public boolean hasParent() { return proto.hasParentId(); }
public boolean isReady() { return ready; }
public boolean isCompleted() {
if (!hasParent()) {
switch (proto.getState()) {
case ROLLEDBACK:
return true;
case FINISHED:
return !proto.hasException();
default:
break;
}
}
return false;
}
public Procedure convert() throws IOException {
if (procedure == null) {
procedure = Procedure.convert(proto);
@ -272,6 +285,10 @@ public class ProcedureWALFormatReader {
return procedure;
}
public ProcedureInfo convertToInfo() {
return ProcedureInfo.convert(proto);
}
@Override
public String toString() {
return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
@ -298,13 +315,32 @@ public class ProcedureWALFormatReader {
}
@Override
public Procedure next() throws IOException {
public boolean isNextCompleted() {
return current != null && current.isCompleted();
}
@Override
public void skipNext() {
current = current.replayNext;
}
@Override
public Procedure nextAsProcedure() throws IOException {
try {
return current.convert();
} finally {
current = current.replayNext;
}
}
@Override
public ProcedureInfo nextAsProcedureInfo() {
try {
return current.convertToInfo();
} finally {
current = current.replayNext;
}
}
}
private static class WalProcedureMap {

View File

@ -236,8 +236,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
return storeTracker;
}
public LinkedList<ProcedureWALFile> getActiveLogs() {
return logs;
public ArrayList<ProcedureWALFile> getActiveLogs() {
lock.lock();
try {
return new ArrayList<ProcedureWALFile>(logs);
} finally {
lock.unlock();
}
}
public Set<ProcedureWALFile> getCorruptedLogs() {
@ -296,7 +301,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
// Load the old logs
final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
try {
@ -316,11 +320,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
loader.handleCorrupted(procIter);
}
@Override
public void removeLog(ProcedureWALFile log) {
toRemove.add(log);
}
@Override
public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
if (corruptedLogs == null) {
@ -331,11 +330,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
});
} finally {
if (!toRemove.isEmpty()) {
for (ProcedureWALFile log: toRemove) {
removeLogFile(log);
}
}
loading.set(false);
}
}
@ -584,6 +578,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
totalSynced = syncSlots(stream, slots, 0, slotIndex);
break;
} catch (Throwable e) {
LOG.warn("unable to sync slots, retry=" + retry);
if (++retry >= maxRetriesBeforeRoll) {
if (logRolled >= maxSyncFailureRoll) {
LOG.error("Sync slots after log roll failed, abort.", e);
@ -627,14 +622,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private boolean rollWriterOrDie() {
for (int i = 1; i <= rollRetries; ++i) {
for (int i = 0; i < rollRetries; ++i) {
if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
try {
if (rollWriter()) {
return true;
}
} catch (IOException e) {
LOG.warn("Unable to roll the log, attempt=" + i, e);
Threads.sleepWithoutInterrupt(waitBeforeRoll);
LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
}
}
LOG.fatal("Unable to roll the log");
@ -881,7 +877,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
private long getMaxLogId(final FileStatus[] logFiles) {
private static long getMaxLogId(final FileStatus[] logFiles) {
long maxLogId = 0;
if (logFiles != null && logFiles.length > 0) {
for (int i = 0; i < logFiles.length; ++i) {
@ -924,7 +920,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (IOException e) {
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
// try the next one...
storeTracker.clear();
storeTracker.reset();
storeTracker.setPartialFlag(true);
}
}

View File

@ -193,7 +193,7 @@ public class TestProcedureStoreTracker {
count++;
}
tracker.clear();
tracker.reset();
}
}
@ -213,7 +213,7 @@ public class TestProcedureStoreTracker {
tracker.setDeleted(i, false);
}
tracker.clear();
tracker.reset();
}
}
}

View File

@ -158,6 +158,56 @@ public class TestWALProcedureStore {
verifyProcIdsOnRestart(procIds);
}
@Test
public void testNoTrailerDoubleRestart() throws Exception {
// log-0001: proc 0, 1 and 2 are inserted
Procedure proc0 = new TestSequentialProcedure();
procStore.insert(proc0, null);
Procedure proc1 = new TestSequentialProcedure();
procStore.insert(proc1, null);
Procedure proc2 = new TestSequentialProcedure();
procStore.insert(proc2, null);
procStore.rollWriterForTesting();
// log-0002: proc 1 deleted
procStore.delete(proc1.getProcId());
procStore.rollWriterForTesting();
// log-0003: proc 2 is update
procStore.update(proc2);
procStore.rollWriterForTesting();
// log-0004: proc 2 deleted
procStore.delete(proc2.getProcId());
// stop the store and remove the trailer
procStore.stop(false);
FileStatus[] logs = fs.listStatus(logDir);
assertEquals(4, logs.length);
for (int i = 0; i < logs.length; ++i) {
corruptLog(logs[i], 4);
}
// Test Load 1
LoadCounter loader = new LoadCounter();
storeRestart(loader);
assertEquals(1, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
// Test Load 2
assertEquals(5, fs.listStatus(logDir).length);
loader = new LoadCounter();
storeRestart(loader);
assertEquals(1, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
// remove proc-0
procStore.delete(proc0.getProcId());
procStore.periodicRollForTesting();
assertEquals(1, fs.listStatus(logDir).length);
storeRestart(loader);
}
@Test
public void testCorruptedTrailer() throws Exception {
// Insert something
@ -289,9 +339,9 @@ public class TestWALProcedureStore {
@Override
public void load(ProcedureIterator procIter) throws IOException {
assertTrue(procIter.hasNext());
assertEquals(1, procIter.next().getProcId());
assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
assertTrue(procIter.hasNext());
assertEquals(2, procIter.next().getProcId());
assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
assertFalse(procIter.hasNext());
}
@ -345,16 +395,16 @@ public class TestWALProcedureStore {
@Override
public void load(ProcedureIterator procIter) throws IOException {
assertTrue(procIter.hasNext());
assertEquals(4, procIter.next().getProcId());
assertEquals(4, procIter.nextAsProcedureInfo().getProcId());
// TODO: This will be multiple call once we do fast-start
//assertFalse(procIter.hasNext());
assertTrue(procIter.hasNext());
assertEquals(1, procIter.next().getProcId());
assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
assertTrue(procIter.hasNext());
assertEquals(2, procIter.next().getProcId());
assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
assertTrue(procIter.hasNext());
assertEquals(3, procIter.next().getProcId());
assertEquals(3, procIter.nextAsProcedureInfo().getProcId());
assertFalse(procIter.hasNext());
}
@ -579,7 +629,7 @@ public class TestWALProcedureStore {
@Override
public void load(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
Procedure proc = procIter.next();
Procedure proc = procIter.nextAsProcedure();
LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
if (procIds != null) {
assertTrue("procId=" + proc.getProcId() + " unexpected",
@ -592,7 +642,7 @@ public class TestWALProcedureStore {
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
Procedure proc = procIter.next();
Procedure proc = procIter.nextAsProcedure();
LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
corrupted.add(proc);
}