HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed
This commit is contained in:
parent
a5228a0b4d
commit
931e1b03ab
|
@ -41,12 +41,12 @@ public class ProcedureInfo implements Cloneable {
|
||||||
private final String procOwner;
|
private final String procOwner;
|
||||||
private final ProcedureState procState;
|
private final ProcedureState procState;
|
||||||
private final long parentId;
|
private final long parentId;
|
||||||
|
private final NonceKey nonceKey;
|
||||||
private final ForeignExceptionMessage exception;
|
private final ForeignExceptionMessage exception;
|
||||||
private final long lastUpdate;
|
private final long lastUpdate;
|
||||||
private final long startTime;
|
private final long startTime;
|
||||||
private final byte[] result;
|
private final byte[] result;
|
||||||
|
|
||||||
private NonceKey nonceKey = null;
|
|
||||||
private long clientAckTime = -1;
|
private long clientAckTime = -1;
|
||||||
|
|
||||||
public ProcedureInfo(
|
public ProcedureInfo(
|
||||||
|
@ -55,6 +55,7 @@ public class ProcedureInfo implements Cloneable {
|
||||||
final String procOwner,
|
final String procOwner,
|
||||||
final ProcedureState procState,
|
final ProcedureState procState,
|
||||||
final long parentId,
|
final long parentId,
|
||||||
|
final NonceKey nonceKey,
|
||||||
final ForeignExceptionMessage exception,
|
final ForeignExceptionMessage exception,
|
||||||
final long lastUpdate,
|
final long lastUpdate,
|
||||||
final long startTime,
|
final long startTime,
|
||||||
|
@ -64,6 +65,7 @@ public class ProcedureInfo implements Cloneable {
|
||||||
this.procOwner = procOwner;
|
this.procOwner = procOwner;
|
||||||
this.procState = procState;
|
this.procState = procState;
|
||||||
this.parentId = parentId;
|
this.parentId = parentId;
|
||||||
|
this.nonceKey = nonceKey;
|
||||||
this.lastUpdate = lastUpdate;
|
this.lastUpdate = lastUpdate;
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
|
|
||||||
|
@ -75,8 +77,8 @@ public class ProcedureInfo implements Cloneable {
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
|
||||||
justification="Intentional; calling super class clone doesn't make sense here.")
|
justification="Intentional; calling super class clone doesn't make sense here.")
|
||||||
public ProcedureInfo clone() {
|
public ProcedureInfo clone() {
|
||||||
return new ProcedureInfo(
|
return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey,
|
||||||
procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime, result);
|
exception, lastUpdate, startTime, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getProcId() {
|
public long getProcId() {
|
||||||
|
@ -107,10 +109,6 @@ public class ProcedureInfo implements Cloneable {
|
||||||
return nonceKey;
|
return nonceKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setNonceKey(NonceKey nonceKey) {
|
|
||||||
this.nonceKey = nonceKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isFailed() {
|
public boolean isFailed() {
|
||||||
return exception != null;
|
return exception != null;
|
||||||
}
|
}
|
||||||
|
@ -218,12 +216,18 @@ public class ProcedureInfo implements Cloneable {
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
|
public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
|
||||||
|
NonceKey nonceKey = null;
|
||||||
|
if (procProto.getNonce() != HConstants.NO_NONCE) {
|
||||||
|
nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
|
||||||
|
}
|
||||||
|
|
||||||
return new ProcedureInfo(
|
return new ProcedureInfo(
|
||||||
procProto.getProcId(),
|
procProto.getProcId(),
|
||||||
procProto.getClassName(),
|
procProto.getClassName(),
|
||||||
procProto.getOwner(),
|
procProto.getOwner(),
|
||||||
procProto.getState(),
|
procProto.getState(),
|
||||||
procProto.hasParentId() ? procProto.getParentId() : -1,
|
procProto.hasParentId() ? procProto.getParentId() : -1,
|
||||||
|
nonceKey,
|
||||||
procProto.hasException() ? procProto.getException() : null,
|
procProto.hasException() ? procProto.getException() : null,
|
||||||
procProto.getLastUpdate(),
|
procProto.getLastUpdate(),
|
||||||
procProto.getStartTime(),
|
procProto.getStartTime(),
|
||||||
|
|
|
@ -640,30 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
|
public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
|
||||||
RemoteProcedureException exception;
|
RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
|
||||||
|
return new ProcedureInfo(
|
||||||
if (proc.hasException()) {
|
|
||||||
exception = proc.getException();
|
|
||||||
} else {
|
|
||||||
exception = null;
|
|
||||||
}
|
|
||||||
ProcedureInfo procInfo = new ProcedureInfo(
|
|
||||||
proc.getProcId(),
|
proc.getProcId(),
|
||||||
proc.toStringClass(),
|
proc.toStringClass(),
|
||||||
proc.getOwner(),
|
proc.getOwner(),
|
||||||
proc.getState(),
|
proc.getState(),
|
||||||
proc.hasParent() ? proc.getParentProcId() : -1,
|
proc.hasParent() ? proc.getParentProcId() : -1,
|
||||||
|
nonceKey,
|
||||||
exception != null ?
|
exception != null ?
|
||||||
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
|
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
|
||||||
proc.getLastUpdate(),
|
proc.getLastUpdate(),
|
||||||
proc.getStartTime(),
|
proc.getStartTime(),
|
||||||
proc.getResult());
|
proc.getResult());
|
||||||
|
|
||||||
if (nonceKey != null) {
|
|
||||||
procInfo.setNonceKey(nonceKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
return procInfo;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -308,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
|
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
|
||||||
int corruptedCount = 0;
|
int corruptedCount = 0;
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
Procedure proc = procIter.next();
|
ProcedureInfo proc = procIter.nextAsProcedureInfo();
|
||||||
LOG.error("corrupted procedure: " + proc);
|
LOG.error("corrupted procedure: " + proc);
|
||||||
corruptedCount++;
|
corruptedCount++;
|
||||||
}
|
}
|
||||||
|
@ -321,24 +321,44 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
|
|
||||||
private void loadProcedures(final ProcedureIterator procIter,
|
private void loadProcedures(final ProcedureIterator procIter,
|
||||||
final boolean abortOnCorruption) throws IOException {
|
final boolean abortOnCorruption) throws IOException {
|
||||||
|
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||||
|
|
||||||
// 1. Build the rollback stack
|
// 1. Build the rollback stack
|
||||||
int runnablesCount = 0;
|
int runnablesCount = 0;
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
Procedure proc = procIter.next();
|
final NonceKey nonceKey;
|
||||||
if (!proc.hasParent() && !proc.isFinished()) {
|
final long procId;
|
||||||
rollbackStack.put(proc.getProcId(), new RootProcedureState());
|
|
||||||
|
if (procIter.isNextCompleted()) {
|
||||||
|
ProcedureInfo proc = procIter.nextAsProcedureInfo();
|
||||||
|
nonceKey = proc.getNonceKey();
|
||||||
|
procId = proc.getProcId();
|
||||||
|
completed.put(proc.getProcId(), proc);
|
||||||
|
if (isDebugEnabled) {
|
||||||
|
LOG.debug("The procedure is completed: " + proc);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Procedure proc = procIter.nextAsProcedure();
|
||||||
|
nonceKey = proc.getNonceKey();
|
||||||
|
procId = proc.getProcId();
|
||||||
|
|
||||||
|
if (!proc.hasParent()) {
|
||||||
|
assert !proc.isFinished() : "unexpected finished procedure";
|
||||||
|
rollbackStack.put(proc.getProcId(), new RootProcedureState());
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the procedure to the map
|
||||||
|
proc.beforeReplay(getEnvironment());
|
||||||
|
procedures.put(proc.getProcId(), proc);
|
||||||
|
|
||||||
|
if (proc.getState() == ProcedureState.RUNNABLE) {
|
||||||
|
runnablesCount++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// add the procedure to the map
|
|
||||||
proc.beforeReplay(getEnvironment());
|
|
||||||
procedures.put(proc.getProcId(), proc);
|
|
||||||
|
|
||||||
// add the nonce to the map
|
// add the nonce to the map
|
||||||
if (proc.getNonceKey() != null) {
|
if (nonceKey != null) {
|
||||||
nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
|
nonceKeysToProcIdsMap.put(nonceKey, procId);
|
||||||
}
|
|
||||||
|
|
||||||
if (proc.getState() == ProcedureState.RUNNABLE) {
|
|
||||||
runnablesCount++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,8 +367,15 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
HashSet<Procedure> waitingSet = null;
|
HashSet<Procedure> waitingSet = null;
|
||||||
procIter.reset();
|
procIter.reset();
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
Procedure proc = procIter.next();
|
if (procIter.isNextCompleted()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
procIter.skipNext();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Procedure proc = procIter.nextAsProcedure();
|
||||||
|
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
|
||||||
|
|
||||||
|
if (isDebugEnabled) {
|
||||||
LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
|
LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
|
||||||
proc.getState(), proc.hasException(), proc));
|
proc.getState(), proc.hasException(), proc));
|
||||||
}
|
}
|
||||||
|
@ -360,18 +387,6 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!proc.hasParent() && proc.isFinished()) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(String.format("The procedure is completed state=%s isFailed=%s",
|
|
||||||
proc.getState(), proc.hasException()));
|
|
||||||
}
|
|
||||||
assert !rollbackStack.containsKey(proc.getProcId());
|
|
||||||
procedures.remove(proc.getProcId());
|
|
||||||
completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (proc.hasParent() && !proc.isFinished()) {
|
if (proc.hasParent() && !proc.isFinished()) {
|
||||||
Procedure parent = procedures.get(proc.getParentProcId());
|
Procedure parent = procedures.get(proc.getParentProcId());
|
||||||
// corrupted procedures are handled later at step 3
|
// corrupted procedures are handled later at step 3
|
||||||
|
@ -850,13 +865,15 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proc.getProcId() == rootProcId && proc.isSuccess()) {
|
if (proc.isSuccess()) {
|
||||||
// Finalize the procedure state
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Procedure completed in " +
|
LOG.debug("Procedure completed in " +
|
||||||
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
|
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
|
||||||
}
|
}
|
||||||
procedureFinished(proc);
|
// Finalize the procedure state
|
||||||
|
if (proc.getProcId() == rootProcId) {
|
||||||
|
procedureFinished(proc);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -66,12 +67,27 @@ public interface ProcedureStore {
|
||||||
*/
|
*/
|
||||||
boolean hasNext();
|
boolean hasNext();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the iterator next element is a completed procedure.
|
||||||
|
*/
|
||||||
|
boolean isNextCompleted();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip the next procedure
|
||||||
|
*/
|
||||||
|
void skipNext();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the next procedure in the iteration.
|
* Returns the next procedure in the iteration.
|
||||||
* @throws IOException if there was an error fetching/deserializing the procedure
|
* @throws IOException if there was an error fetching/deserializing the procedure
|
||||||
* @return the next procedure in the iteration.
|
* @return the next procedure in the iteration.
|
||||||
*/
|
*/
|
||||||
Procedure next() throws IOException;
|
Procedure nextAsProcedure() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the next procedure in the iteration as ProcedureInfo.
|
||||||
|
*/
|
||||||
|
ProcedureInfo nextAsProcedureInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -414,7 +414,9 @@ public class ProcedureStoreTracker {
|
||||||
node.updateState(procId, isDeleted);
|
node.updateState(procId, isDeleted);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void reset() {
|
||||||
|
this.keepDeletes = false;
|
||||||
|
this.partial = false;
|
||||||
this.map.clear();
|
this.map.clear();
|
||||||
resetUpdates();
|
resetUpdates();
|
||||||
}
|
}
|
||||||
|
@ -579,11 +581,11 @@ public class ProcedureStoreTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readFrom(final InputStream stream) throws IOException {
|
public void readFrom(final InputStream stream) throws IOException {
|
||||||
ProcedureProtos.ProcedureStoreTracker data =
|
reset();
|
||||||
|
final ProcedureProtos.ProcedureStoreTracker data =
|
||||||
ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
|
ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
|
||||||
map.clear();
|
|
||||||
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
|
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
|
||||||
BitSetNode node = BitSetNode.convert(protoNode);
|
final BitSetNode node = BitSetNode.convert(protoNode);
|
||||||
map.put(node.getStart(), node);
|
map.put(node.getStart(), node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,6 @@ public final class ProcedureWALFormat {
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Loader extends ProcedureLoader {
|
interface Loader extends ProcedureLoader {
|
||||||
void removeLog(ProcedureWALFile log);
|
|
||||||
void markCorruptedWAL(ProcedureWALFile log, IOException e);
|
void markCorruptedWAL(ProcedureWALFile log, IOException e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,9 +22,10 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||||
|
@ -79,7 +80,7 @@ public class ProcedureWALFormatReader {
|
||||||
//
|
//
|
||||||
// Fast Start: INIT/INSERT record and StackIDs
|
// Fast Start: INIT/INSERT record and StackIDs
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
// We have to special record, INIT and INSERT that tracks the first time
|
// We have two special record, INIT and INSERT that tracks the first time
|
||||||
// the procedure was added to the WAL. We can use that information to be able
|
// the procedure was added to the WAL. We can use that information to be able
|
||||||
// to start procedures before reaching the end of the WAL, or before reading all the WALs.
|
// to start procedures before reaching the end of the WAL, or before reading all the WALs.
|
||||||
// but in some cases the WAL with that record can be already gone.
|
// but in some cases the WAL with that record can be already gone.
|
||||||
|
@ -146,10 +147,7 @@ public class ProcedureWALFormatReader {
|
||||||
loader.markCorruptedWAL(log, e);
|
loader.markCorruptedWAL(log, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (localProcedureMap.isEmpty()) {
|
if (!localProcedureMap.isEmpty()) {
|
||||||
LOG.info("No active entry found in state log " + log + ". removing it");
|
|
||||||
loader.removeLog(log);
|
|
||||||
} else {
|
|
||||||
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
|
log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
|
||||||
procedureMap.mergeTail(localProcedureMap);
|
procedureMap.mergeTail(localProcedureMap);
|
||||||
//if (hasFastStartSupport) {
|
//if (hasFastStartSupport) {
|
||||||
|
@ -215,6 +213,7 @@ public class ProcedureWALFormatReader {
|
||||||
}
|
}
|
||||||
maxProcId = Math.max(maxProcId, entry.getProcId());
|
maxProcId = Math.max(maxProcId, entry.getProcId());
|
||||||
localProcedureMap.remove(entry.getProcId());
|
localProcedureMap.remove(entry.getProcId());
|
||||||
|
assert !procedureMap.contains(entry.getProcId());
|
||||||
tracker.setDeleted(entry.getProcId(), true);
|
tracker.setDeleted(entry.getProcId(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,6 +264,20 @@ public class ProcedureWALFormatReader {
|
||||||
public boolean hasParent() { return proto.hasParentId(); }
|
public boolean hasParent() { return proto.hasParentId(); }
|
||||||
public boolean isReady() { return ready; }
|
public boolean isReady() { return ready; }
|
||||||
|
|
||||||
|
public boolean isCompleted() {
|
||||||
|
if (!hasParent()) {
|
||||||
|
switch (proto.getState()) {
|
||||||
|
case ROLLEDBACK:
|
||||||
|
return true;
|
||||||
|
case FINISHED:
|
||||||
|
return !proto.hasException();
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
public Procedure convert() throws IOException {
|
public Procedure convert() throws IOException {
|
||||||
if (procedure == null) {
|
if (procedure == null) {
|
||||||
procedure = Procedure.convert(proto);
|
procedure = Procedure.convert(proto);
|
||||||
|
@ -272,6 +285,10 @@ public class ProcedureWALFormatReader {
|
||||||
return procedure;
|
return procedure;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ProcedureInfo convertToInfo() {
|
||||||
|
return ProcedureInfo.convert(proto);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
|
return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
|
||||||
|
@ -298,13 +315,32 @@ public class ProcedureWALFormatReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Procedure next() throws IOException {
|
public boolean isNextCompleted() {
|
||||||
|
return current != null && current.isCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void skipNext() {
|
||||||
|
current = current.replayNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Procedure nextAsProcedure() throws IOException {
|
||||||
try {
|
try {
|
||||||
return current.convert();
|
return current.convert();
|
||||||
} finally {
|
} finally {
|
||||||
current = current.replayNext;
|
current = current.replayNext;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProcedureInfo nextAsProcedureInfo() {
|
||||||
|
try {
|
||||||
|
return current.convertToInfo();
|
||||||
|
} finally {
|
||||||
|
current = current.replayNext;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class WalProcedureMap {
|
private static class WalProcedureMap {
|
||||||
|
|
|
@ -236,8 +236,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
return storeTracker;
|
return storeTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LinkedList<ProcedureWALFile> getActiveLogs() {
|
public ArrayList<ProcedureWALFile> getActiveLogs() {
|
||||||
return logs;
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return new ArrayList<ProcedureWALFile>(logs);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ProcedureWALFile> getCorruptedLogs() {
|
public Set<ProcedureWALFile> getCorruptedLogs() {
|
||||||
|
@ -296,7 +301,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the old logs
|
// Load the old logs
|
||||||
final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
|
|
||||||
Iterator<ProcedureWALFile> it = logs.descendingIterator();
|
Iterator<ProcedureWALFile> it = logs.descendingIterator();
|
||||||
it.next(); // Skip the current log
|
it.next(); // Skip the current log
|
||||||
try {
|
try {
|
||||||
|
@ -316,11 +320,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
loader.handleCorrupted(procIter);
|
loader.handleCorrupted(procIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void removeLog(ProcedureWALFile log) {
|
|
||||||
toRemove.add(log);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
|
public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
|
||||||
if (corruptedLogs == null) {
|
if (corruptedLogs == null) {
|
||||||
|
@ -331,11 +330,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} finally {
|
} finally {
|
||||||
if (!toRemove.isEmpty()) {
|
|
||||||
for (ProcedureWALFile log: toRemove) {
|
|
||||||
removeLogFile(log);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
loading.set(false);
|
loading.set(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -584,6 +578,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
totalSynced = syncSlots(stream, slots, 0, slotIndex);
|
totalSynced = syncSlots(stream, slots, 0, slotIndex);
|
||||||
break;
|
break;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("unable to sync slots, retry=" + retry);
|
||||||
if (++retry >= maxRetriesBeforeRoll) {
|
if (++retry >= maxRetriesBeforeRoll) {
|
||||||
if (logRolled >= maxSyncFailureRoll) {
|
if (logRolled >= maxSyncFailureRoll) {
|
||||||
LOG.error("Sync slots after log roll failed, abort.", e);
|
LOG.error("Sync slots after log roll failed, abort.", e);
|
||||||
|
@ -627,14 +622,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean rollWriterOrDie() {
|
private boolean rollWriterOrDie() {
|
||||||
for (int i = 1; i <= rollRetries; ++i) {
|
for (int i = 0; i < rollRetries; ++i) {
|
||||||
|
if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (rollWriter()) {
|
if (rollWriter()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Unable to roll the log, attempt=" + i, e);
|
LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
|
||||||
Threads.sleepWithoutInterrupt(waitBeforeRoll);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.fatal("Unable to roll the log");
|
LOG.fatal("Unable to roll the log");
|
||||||
|
@ -881,7 +877,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getMaxLogId(final FileStatus[] logFiles) {
|
private static long getMaxLogId(final FileStatus[] logFiles) {
|
||||||
long maxLogId = 0;
|
long maxLogId = 0;
|
||||||
if (logFiles != null && logFiles.length > 0) {
|
if (logFiles != null && logFiles.length > 0) {
|
||||||
for (int i = 0; i < logFiles.length; ++i) {
|
for (int i = 0; i < logFiles.length; ++i) {
|
||||||
|
@ -924,7 +920,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
|
LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
|
||||||
// try the next one...
|
// try the next one...
|
||||||
storeTracker.clear();
|
storeTracker.reset();
|
||||||
storeTracker.setPartialFlag(true);
|
storeTracker.setPartialFlag(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,7 +192,7 @@ public class TestProcedureStoreTracker {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
tracker.clear();
|
tracker.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,7 +212,7 @@ public class TestProcedureStoreTracker {
|
||||||
tracker.setDeleted(i, false);
|
tracker.setDeleted(i, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
tracker.clear();
|
tracker.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,56 @@ public class TestWALProcedureStore {
|
||||||
verifyProcIdsOnRestart(procIds);
|
verifyProcIdsOnRestart(procIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoTrailerDoubleRestart() throws Exception {
|
||||||
|
// log-0001: proc 0, 1 and 2 are inserted
|
||||||
|
Procedure proc0 = new TestSequentialProcedure();
|
||||||
|
procStore.insert(proc0, null);
|
||||||
|
Procedure proc1 = new TestSequentialProcedure();
|
||||||
|
procStore.insert(proc1, null);
|
||||||
|
Procedure proc2 = new TestSequentialProcedure();
|
||||||
|
procStore.insert(proc2, null);
|
||||||
|
procStore.rollWriterForTesting();
|
||||||
|
|
||||||
|
// log-0002: proc 1 deleted
|
||||||
|
procStore.delete(proc1.getProcId());
|
||||||
|
procStore.rollWriterForTesting();
|
||||||
|
|
||||||
|
// log-0003: proc 2 is update
|
||||||
|
procStore.update(proc2);
|
||||||
|
procStore.rollWriterForTesting();
|
||||||
|
|
||||||
|
// log-0004: proc 2 deleted
|
||||||
|
procStore.delete(proc2.getProcId());
|
||||||
|
|
||||||
|
// stop the store and remove the trailer
|
||||||
|
procStore.stop(false);
|
||||||
|
FileStatus[] logs = fs.listStatus(logDir);
|
||||||
|
assertEquals(4, logs.length);
|
||||||
|
for (int i = 0; i < logs.length; ++i) {
|
||||||
|
corruptLog(logs[i], 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test Load 1
|
||||||
|
LoadCounter loader = new LoadCounter();
|
||||||
|
storeRestart(loader);
|
||||||
|
assertEquals(1, loader.getLoadedCount());
|
||||||
|
assertEquals(0, loader.getCorruptedCount());
|
||||||
|
|
||||||
|
// Test Load 2
|
||||||
|
assertEquals(5, fs.listStatus(logDir).length);
|
||||||
|
loader = new LoadCounter();
|
||||||
|
storeRestart(loader);
|
||||||
|
assertEquals(1, loader.getLoadedCount());
|
||||||
|
assertEquals(0, loader.getCorruptedCount());
|
||||||
|
|
||||||
|
// remove proc-0
|
||||||
|
procStore.delete(proc0.getProcId());
|
||||||
|
procStore.periodicRollForTesting();
|
||||||
|
assertEquals(1, fs.listStatus(logDir).length);
|
||||||
|
storeRestart(loader);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCorruptedTrailer() throws Exception {
|
public void testCorruptedTrailer() throws Exception {
|
||||||
// Insert something
|
// Insert something
|
||||||
|
@ -290,9 +340,9 @@ public class TestWALProcedureStore {
|
||||||
@Override
|
@Override
|
||||||
public void load(ProcedureIterator procIter) throws IOException {
|
public void load(ProcedureIterator procIter) throws IOException {
|
||||||
assertTrue(procIter.hasNext());
|
assertTrue(procIter.hasNext());
|
||||||
assertEquals(1, procIter.next().getProcId());
|
assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
|
||||||
assertTrue(procIter.hasNext());
|
assertTrue(procIter.hasNext());
|
||||||
assertEquals(2, procIter.next().getProcId());
|
assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
|
||||||
assertFalse(procIter.hasNext());
|
assertFalse(procIter.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,16 +396,16 @@ public class TestWALProcedureStore {
|
||||||
@Override
|
@Override
|
||||||
public void load(ProcedureIterator procIter) throws IOException {
|
public void load(ProcedureIterator procIter) throws IOException {
|
||||||
assertTrue(procIter.hasNext());
|
assertTrue(procIter.hasNext());
|
||||||
assertEquals(4, procIter.next().getProcId());
|
assertEquals(4, procIter.nextAsProcedureInfo().getProcId());
|
||||||
// TODO: This will be multiple call once we do fast-start
|
// TODO: This will be multiple call once we do fast-start
|
||||||
//assertFalse(procIter.hasNext());
|
//assertFalse(procIter.hasNext());
|
||||||
|
|
||||||
assertTrue(procIter.hasNext());
|
assertTrue(procIter.hasNext());
|
||||||
assertEquals(1, procIter.next().getProcId());
|
assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
|
||||||
assertTrue(procIter.hasNext());
|
assertTrue(procIter.hasNext());
|
||||||
assertEquals(2, procIter.next().getProcId());
|
assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
|
||||||
assertTrue(procIter.hasNext());
|
assertTrue(procIter.hasNext());
|
||||||
assertEquals(3, procIter.next().getProcId());
|
assertEquals(3, procIter.nextAsProcedureInfo().getProcId());
|
||||||
assertFalse(procIter.hasNext());
|
assertFalse(procIter.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -580,7 +630,7 @@ public class TestWALProcedureStore {
|
||||||
@Override
|
@Override
|
||||||
public void load(ProcedureIterator procIter) throws IOException {
|
public void load(ProcedureIterator procIter) throws IOException {
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
Procedure proc = procIter.next();
|
Procedure proc = procIter.nextAsProcedure();
|
||||||
LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
|
LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
|
||||||
if (procIds != null) {
|
if (procIds != null) {
|
||||||
assertTrue("procId=" + proc.getProcId() + " unexpected",
|
assertTrue("procId=" + proc.getProcId() + " unexpected",
|
||||||
|
@ -593,7 +643,7 @@ public class TestWALProcedureStore {
|
||||||
@Override
|
@Override
|
||||||
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
|
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
|
||||||
while (procIter.hasNext()) {
|
while (procIter.hasNext()) {
|
||||||
Procedure proc = procIter.next();
|
Procedure proc = procIter.nextAsProcedure();
|
||||||
LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
|
LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
|
||||||
corrupted.add(proc);
|
corrupted.add(proc);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue