HBASE-14947 WALProcedureStore improvements

This commit is contained in:
Matteo Bertozzi 2015-12-15 10:15:18 -08:00
parent b8539c62e8
commit 60d33ce341
5 changed files with 203 additions and 202 deletions

View File

@ -27,7 +27,6 @@ import java.util.TreeMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
/** /**
@ -356,25 +355,19 @@ public class ProcedureStoreTracker {
} }
} }
public void insert(final Procedure proc, final Procedure[] subprocs) {
insert(proc.getProcId());
if (subprocs != null) {
for (int i = 0; i < subprocs.length; ++i) {
insert(subprocs[i].getProcId());
}
}
}
public void update(final Procedure proc) {
update(proc.getProcId());
}
public void insert(long procId) { public void insert(long procId) {
BitSetNode node = getOrCreateNode(procId); BitSetNode node = getOrCreateNode(procId);
node.update(procId); node.update(procId);
trackProcIds(procId); trackProcIds(procId);
} }
public void insert(final long procId, final long[] subProcIds) {
update(procId);
for (int i = 0; i < subProcIds.length; ++i) {
insert(subProcIds[i]);
}
}
public void update(long procId) { public void update(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
assert entry != null : "expected node to update procId=" + procId; assert entry != null : "expected node to update procId=" + procId;

View File

@ -100,7 +100,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>(); private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
private final AtomicLong inactiveLogsMaxId = new AtomicLong(0);
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition(); private final Condition waitCond = lock.newCondition();
private final Condition slotCond = lock.newCondition(); private final Condition slotCond = lock.newCondition();
@ -191,19 +190,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
LOG.info("Stopping the WAL Procedure Store"); LOG.info("Stopping the WAL Procedure Store");
if (lock.tryLock()) { sendStopSignal();
try {
waitCond.signalAll();
syncCond.signalAll();
} finally {
lock.unlock();
}
}
if (!abort) { if (!abort) {
try { try {
syncThread.join(); while (syncThread.isAlive()) {
sendStopSignal();
syncThread.join(250);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("join interrupted", e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
@ -220,6 +216,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
logs.clear(); logs.clear();
} }
private void sendStopSignal() {
if (lock.tryLock()) {
try {
waitCond.signalAll();
syncCond.signalAll();
} finally {
lock.unlock();
}
}
}
@Override @Override
public int getNumThreads() { public int getNumThreads() {
return slots == null ? 0 : slots.length; return slots == null ? 0 : slots.length;
@ -239,6 +246,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override @Override
public void recoverLease() throws IOException { public void recoverLease() throws IOException {
lock.lock();
try {
LOG.info("Starting WAL Procedure Store lease recovery"); LOG.info("Starting WAL Procedure Store lease recovery");
FileStatus[] oldLogs = getLogFiles(); FileStatus[] oldLogs = getLogFiles();
while (isRunning()) { while (isRunning()) {
@ -265,6 +274,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
LOG.info("Lease acquired for flushLogId: " + flushLogId); LOG.info("Lease acquired for flushLogId: " + flushLogId);
break; break;
} }
} finally {
lock.unlock();
}
} }
@Override @Override
@ -335,18 +347,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
ByteSlot slot = acquireSlot(); ByteSlot slot = acquireSlot();
long logId = -1;
try { try {
// Serialize the insert // Serialize the insert
long[] subProcIds = null;
if (subprocs != null) { if (subprocs != null) {
ProcedureWALFormat.writeInsert(slot, proc, subprocs); ProcedureWALFormat.writeInsert(slot, proc, subprocs);
subProcIds = new long[subprocs.length];
for (int i = 0; i < subprocs.length; ++i) {
subProcIds[i] = subprocs[i].getProcId();
}
} else { } else {
assert !proc.hasParent(); assert !proc.hasParent();
ProcedureWALFormat.writeInsert(slot, proc); ProcedureWALFormat.writeInsert(slot, proc);
} }
// Push the transaction data and wait until it is persisted // Push the transaction data and wait until it is persisted
pushData(slot); pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
} catch (IOException e) { } catch (IOException e) {
// We are not able to serialize the procedure. // We are not able to serialize the procedure.
// this is a code error, and we are not able to go on. // this is a code error, and we are not able to go on.
@ -356,14 +372,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} finally { } finally {
releaseSlot(slot); releaseSlot(slot);
} }
// Update the store tracker
synchronized (storeTracker) {
storeTracker.insert(proc, subprocs);
if (logId == flushLogId) {
checkAndTryRoll();
}
}
} }
@Override @Override
@ -373,13 +381,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
ByteSlot slot = acquireSlot(); ByteSlot slot = acquireSlot();
long logId = -1;
try { try {
// Serialize the update // Serialize the update
ProcedureWALFormat.writeUpdate(slot, proc); ProcedureWALFormat.writeUpdate(slot, proc);
// Push the transaction data and wait until it is persisted // Push the transaction data and wait until it is persisted
logId = pushData(slot); pushData(PushType.UPDATE, slot, proc.getProcId(), null);
} catch (IOException e) { } catch (IOException e) {
// We are not able to serialize the procedure. // We are not able to serialize the procedure.
// this is a code error, and we are not able to go on. // this is a code error, and we are not able to go on.
@ -388,20 +395,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} finally { } finally {
releaseSlot(slot); releaseSlot(slot);
} }
// Update the store tracker
boolean removeOldLogs = false;
synchronized (storeTracker) {
storeTracker.update(proc);
if (logId == flushLogId) {
removeOldLogs = storeTracker.isUpdated();
checkAndTryRoll();
}
}
if (removeOldLogs) {
setInactiveLogsMaxId(logId - 1);
}
} }
@Override @Override
@ -411,13 +404,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
ByteSlot slot = acquireSlot(); ByteSlot slot = acquireSlot();
long logId = -1;
try { try {
// Serialize the delete // Serialize the delete
ProcedureWALFormat.writeDelete(slot, procId); ProcedureWALFormat.writeDelete(slot, procId);
// Push the transaction data and wait until it is persisted // Push the transaction data and wait until it is persisted
logId = pushData(slot); pushData(PushType.DELETE, slot, procId, null);
} catch (IOException e) { } catch (IOException e) {
// We are not able to serialize the procedure. // We are not able to serialize the procedure.
// this is a code error, and we are not able to go on. // this is a code error, and we are not able to go on.
@ -426,22 +418,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} finally { } finally {
releaseSlot(slot); releaseSlot(slot);
} }
boolean removeOldLogs = false;
synchronized (storeTracker) {
storeTracker.delete(procId);
if (logId == flushLogId) {
if (storeTracker.isEmpty() || storeTracker.isUpdated()) {
removeOldLogs = checkAndTryRoll();
} else {
checkAndTryRoll();
}
}
}
if (removeOldLogs) {
setInactiveLogsMaxId(logId);
}
} }
private ByteSlot acquireSlot() { private ByteSlot acquireSlot() {
@ -454,7 +430,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
slotsCache.offer(slot); slotsCache.offer(slot);
} }
private long pushData(final ByteSlot slot) { private enum PushType { INSERT, UPDATE, DELETE };
private long pushData(final PushType type, final ByteSlot slot,
final long procId, final long[] subProcIds) {
if (!isRunning()) { if (!isRunning()) {
throw new RuntimeException("the store must be running before inserting data"); throw new RuntimeException("the store must be running before inserting data");
} }
@ -481,6 +460,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
} }
updateStoreTracker(type, procId, subProcIds);
slots[slotIndex++] = slot; slots[slotIndex++] = slot;
logId = flushLogId; logId = flushLogId;
@ -509,20 +489,29 @@ public class WALProcedureStore extends ProcedureStoreBase {
return logId; return logId;
} }
private boolean isSyncAborted() { private void updateStoreTracker(final PushType type,
return syncException.get() != null; final long procId, final long[] subProcIds) {
switch (type) {
case INSERT:
if (subProcIds == null) {
storeTracker.insert(procId);
} else {
storeTracker.insert(procId, subProcIds);
}
break;
case UPDATE:
storeTracker.update(procId);
break;
case DELETE:
storeTracker.delete(procId);
break;
default:
throw new RuntimeException("invalid push type " + type);
}
} }
protected void periodicRoll() throws IOException { private boolean isSyncAborted() {
long logId; return syncException.get() != null;
boolean removeOldLogs;
synchronized (storeTracker) {
logId = flushLogId;
removeOldLogs = storeTracker.isEmpty();
}
if (checkAndTryRoll() && removeOldLogs) {
setInactiveLogsMaxId(logId);
}
} }
private void syncLoop() throws Throwable { private void syncLoop() throws Throwable {
@ -534,7 +523,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Wait until new data is available // Wait until new data is available
if (slotIndex == 0) { if (slotIndex == 0) {
if (!loading.get()) { if (!loading.get()) {
removeInactiveLogs(); periodicRoll();
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -547,7 +536,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
if (slotIndex == 0) { if (slotIndex == 0) {
// no data.. probably a stop() or a periodic roll // no data.. probably a stop() or a periodic roll
periodicRoll();
continue; continue;
} }
} }
@ -560,13 +548,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
long syncWaitMs = System.currentTimeMillis() - syncWaitSt; long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = getMillisFromLastRoll() / 1000.0f; float rollSec = getMillisFromLastRoll() / 1000.0f;
LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec", LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
StringUtils.humanTimeDiff(syncWaitMs), slotIndex, StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
StringUtils.humanSize(totalSynced.get()), StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollSec))); StringUtils.humanSize(totalSynced.get() / rollSec)));
} }
inSync.set(true); inSync.set(true);
totalSynced.addAndGet(syncSlots()); totalSynced.addAndGet(syncSlots());
slotIndex = 0; slotIndex = 0;
@ -639,8 +626,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced; return totalSynced;
} }
@VisibleForTesting private boolean rollWriterOrDie() {
public boolean rollWriterOrDie() {
for (int i = 1; i <= rollRetries; ++i) { for (int i = 1; i <= rollRetries; ++i) {
try { try {
if (rollWriter()) { if (rollWriter()) {
@ -656,18 +642,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
throw new RuntimeException("unable to roll the log"); throw new RuntimeException("unable to roll the log");
} }
protected boolean checkAndTryRoll() { private boolean tryRollWriter() {
if (!isRunning()) return false;
if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
try { try {
return rollWriter(); return rollWriter();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to roll the log", e); LOG.warn("Unable to roll the log", e);
}
}
return false; return false;
} }
}
private long getMillisToNextPeriodicRoll() { private long getMillisToNextPeriodicRoll() {
if (lastRollTs.get() > 0 && periodicRollMsec > 0) { if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
@ -680,7 +662,52 @@ public class WALProcedureStore extends ProcedureStoreBase {
return (System.currentTimeMillis() - lastRollTs.get()); return (System.currentTimeMillis() - lastRollTs.get());
} }
protected boolean rollWriter() throws IOException { @VisibleForTesting
protected void periodicRollForTesting() throws IOException {
lock.lock();
try {
periodicRoll();
} finally {
lock.unlock();
}
}
@VisibleForTesting
protected boolean rollWriterForTesting() throws IOException {
lock.lock();
try {
return rollWriter();
} finally {
lock.unlock();
}
}
private void periodicRoll() throws IOException {
if (storeTracker.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("no active procedures");
}
tryRollWriter();
removeAllLogs(flushLogId - 1);
} else {
if (storeTracker.isUpdated()) {
if (LOG.isTraceEnabled()) {
LOG.trace("all the active procedures are in the latest log");
}
removeAllLogs(flushLogId - 1);
}
// if the log size has exceeded the roll threshold
// or the periodic roll timeout is expired, try to roll the wal.
if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
tryRollWriter();
}
removeInactiveLogs();
}
}
private boolean rollWriter() throws IOException {
// Create new state-log // Create new state-log
if (!rollWriter(flushLogId + 1)) { if (!rollWriter(flushLogId + 1)) {
LOG.warn("someone else has already created log " + flushLogId); LOG.warn("someone else has already created log " + flushLogId);
@ -701,6 +728,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private boolean rollWriter(final long logId) throws IOException { private boolean rollWriter(final long logId) throws IOException {
assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
ProcedureWALHeader header = ProcedureWALHeader.newBuilder() ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
.setVersion(ProcedureWALFormat.HEADER_VERSION) .setVersion(ProcedureWALFormat.HEADER_VERSION)
@ -730,20 +758,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
newStream.close(); newStream.close();
return false; return false;
} }
lock.lock();
try {
closeStream(); closeStream();
synchronized (storeTracker) {
storeTracker.resetUpdates(); storeTracker.resetUpdates();
}
stream = newStream; stream = newStream;
flushLogId = logId; flushLogId = logId;
totalSynced.set(0); totalSynced.set(0);
lastRollTs.set(System.currentTimeMillis()); lastRollTs.set(System.currentTimeMillis());
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
} finally {
lock.unlock();
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Roll new state log: " + logId); LOG.debug("Roll new state log: " + logId);
} }
@ -754,11 +778,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
try { try {
if (stream != null) { if (stream != null) {
try { try {
synchronized (storeTracker) {
ProcedureWALFile log = logs.getLast(); ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
ProcedureWALFormat.writeTrailer(stream, storeTracker); ProcedureWALFormat.writeTrailer(stream, storeTracker);
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to write the trailer: " + e.getMessage()); LOG.warn("Unable to write the trailer: " + e.getMessage());
} }
@ -774,31 +796,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
// ========================================================================== // ==========================================================================
// Log Files cleaner helpers // Log Files cleaner helpers
// ========================================================================== // ==========================================================================
private void setInactiveLogsMaxId(long logId) {
long expect = 0;
while (!inactiveLogsMaxId.compareAndSet(expect, logId)) {
expect = inactiveLogsMaxId.get();
if (expect >= logId) {
break;
}
}
}
private void removeInactiveLogs() { private void removeInactiveLogs() {
long lastLogId = inactiveLogsMaxId.get();
if (lastLogId != 0) {
removeAllLogs(lastLogId);
inactiveLogsMaxId.compareAndSet(lastLogId, 0);
}
// Verify if the ProcId of the first oldest is still active. if not remove the file. // Verify if the ProcId of the first oldest is still active. if not remove the file.
while (logs.size() > 1) { while (logs.size() > 1) {
ProcedureWALFile log = logs.getFirst(); ProcedureWALFile log = logs.getFirst();
synchronized (storeTracker) {
if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
break; break;
} }
}
removeLogFile(log); removeLogFile(log);
} }
} }

View File

@ -313,7 +313,7 @@ public class TestProcedureRecovery {
public void testRunningProcWithSameNonce() throws Exception { public void testRunningProcWithSameNonce() throws Exception {
final long nonceGroup = 456; final long nonceGroup = 456;
final long nonce = 33333; final long nonce = 33333;
Procedure proc = new TestMultiStepProcedure(); Procedure proc = new TestSingleStepProcedure();
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
// Restart (use a latch to prevent the step execution until we submitted proc2) // Restart (use a latch to prevent the step execution until we submitted proc2)
@ -321,7 +321,7 @@ public class TestProcedureRecovery {
procEnv.setWaitLatch(latch); procEnv.setWaitLatch(latch);
restart(); restart();
// Submit a procedure with the same nonce and expect the same procedure would return. // Submit a procedure with the same nonce and expect the same procedure would return.
Procedure proc2 = new TestMultiStepProcedure(); Procedure proc2 = new TestSingleStepProcedure();
long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce); long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
latch.countDown(); latch.countDown();
procEnv.setWaitLatch(null); procEnv.setWaitLatch(null);

View File

@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -42,27 +41,6 @@ import static org.junit.Assert.fail;
public class TestProcedureStoreTracker { public class TestProcedureStoreTracker {
private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class); private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class);
static class TestProcedure extends Procedure<Void> {
public TestProcedure(long procId) {
setProcId(procId);
}
@Override
protected Procedure[] execute(Void env) { return null; }
@Override
protected void rollback(Void env) { /* no-op */ }
@Override
protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) { /* no-op */ }
@Override
protected void deserializeStateData(final InputStream stream) { /* no-op */ }
}
@Test @Test
public void testSeqInsertAndDelete() { public void testSeqInsertAndDelete() {
ProcedureStoreTracker tracker = new ProcedureStoreTracker(); ProcedureStoreTracker tracker = new ProcedureStoreTracker();
@ -162,13 +140,10 @@ public class TestProcedureStoreTracker {
ProcedureStoreTracker tracker = new ProcedureStoreTracker(); ProcedureStoreTracker tracker = new ProcedureStoreTracker();
assertTrue(tracker.isEmpty()); assertTrue(tracker.isEmpty());
Procedure[] procs = new TestProcedure[] { long[] procs = new long[] { 1, 2, 3, 4, 5, 6 };
new TestProcedure(1), new TestProcedure(2), new TestProcedure(3),
new TestProcedure(4), new TestProcedure(5), new TestProcedure(6),
};
tracker.insert(procs[0], null); tracker.insert(procs[0]);
tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] }); tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] });
assertFalse(tracker.isEmpty()); assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated()); assertTrue(tracker.isUpdated());
@ -190,11 +165,11 @@ public class TestProcedureStoreTracker {
assertTrue(tracker.isUpdated()); assertTrue(tracker.isUpdated());
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
tracker.delete(procs[i].getProcId()); tracker.delete(procs[i]);
assertFalse(tracker.isEmpty()); assertFalse(tracker.isEmpty());
assertTrue(tracker.isUpdated()); assertTrue(tracker.isUpdated());
} }
tracker.delete(procs[5].getProcId()); tracker.delete(procs[5]);
assertTrue(tracker.isEmpty()); assertTrue(tracker.isEmpty());
} }

View File

@ -102,7 +102,7 @@ public class TestWALProcedureStore {
@Test @Test
public void testEmptyRoll() throws Exception { public void testEmptyRoll() throws Exception {
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
procStore.periodicRoll(); procStore.periodicRollForTesting();
} }
FileStatus[] status = fs.listStatus(logDir); FileStatus[] status = fs.listStatus(logDir);
assertEquals(1, status.length); assertEquals(1, status.length);
@ -214,14 +214,14 @@ public class TestWALProcedureStore {
procStore.update(rootProcs[i-1]); procStore.update(rootProcs[i-1]);
} }
// insert root-child txn // insert root-child txn
procStore.rollWriter(); procStore.rollWriterForTesting();
for (int i = 1; i <= rootProcs.length; i++) { for (int i = 1; i <= rootProcs.length; i++) {
TestProcedure b = new TestProcedure(rootProcs.length + i, i); TestProcedure b = new TestProcedure(rootProcs.length + i, i);
rootProcs[i-1].addStackId(1); rootProcs[i-1].addStackId(1);
procStore.insert(rootProcs[i-1], new Procedure[] { b }); procStore.insert(rootProcs[i-1], new Procedure[] { b });
} }
// insert child updates // insert child updates
procStore.rollWriter(); procStore.rollWriterForTesting();
for (int i = 1; i <= rootProcs.length; i++) { for (int i = 1; i <= rootProcs.length; i++) {
procStore.update(new TestProcedure(rootProcs.length + i, i)); procStore.update(new TestProcedure(rootProcs.length + i, i));
} }
@ -229,9 +229,10 @@ public class TestWALProcedureStore {
// Stop the store // Stop the store
procStore.stop(false); procStore.stop(false);
// Remove 4 byte from the trailer // the first log was removed,
// we have insert-txn and updates in the others so everything is fine
FileStatus[] logs = fs.listStatus(logDir); FileStatus[] logs = fs.listStatus(logDir);
assertEquals(3, logs.length); assertEquals(Arrays.toString(logs), 2, logs.length);
Arrays.sort(logs, new Comparator<FileStatus>() { Arrays.sort(logs, new Comparator<FileStatus>() {
@Override @Override
public int compare(FileStatus o1, FileStatus o2) { public int compare(FileStatus o1, FileStatus o2) {
@ -239,15 +240,13 @@ public class TestWALProcedureStore {
} }
}); });
// Remove the first log, we have insert-txn and updates in the others so everything is fine.
fs.delete(logs[0].getPath(), false);
LoadCounter loader = new LoadCounter(); LoadCounter loader = new LoadCounter();
storeRestart(loader); storeRestart(loader);
assertEquals(rootProcs.length * 2, loader.getLoadedCount()); assertEquals(rootProcs.length * 2, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount()); assertEquals(0, loader.getCorruptedCount());
// Remove the second log, we have lost any root/parent references // Remove the second log, we have lost all the root/parent references
fs.delete(logs[1].getPath(), false); fs.delete(logs[0].getPath(), false);
loader.reset(); loader.reset();
storeRestart(loader); storeRestart(loader);
assertEquals(0, loader.getLoadedCount()); assertEquals(0, loader.getLoadedCount());
@ -276,7 +275,7 @@ public class TestWALProcedureStore {
b.addStackId(1); b.addStackId(1);
procStore.update(b); procStore.update(b);
procStore.rollWriter(); procStore.rollWriterForTesting();
a.addStackId(2); a.addStackId(2);
procStore.update(a); procStore.update(a);
@ -325,7 +324,7 @@ public class TestWALProcedureStore {
b.addStackId(2); b.addStackId(2);
procStore.update(b); procStore.update(b);
procStore.rollWriter(); procStore.rollWriterForTesting();
b.addStackId(3); b.addStackId(3);
procStore.update(b); procStore.update(b);
@ -426,6 +425,36 @@ public class TestWALProcedureStore {
assertEquals(1, procStore.getActiveLogs().size()); assertEquals(1, procStore.getActiveLogs().size());
} }
@Test
public void testRollAndRemove() throws IOException {
// Insert something in the log
Procedure proc1 = new TestSequentialProcedure();
procStore.insert(proc1, null);
Procedure proc2 = new TestSequentialProcedure();
procStore.insert(proc2, null);
// roll the log, now we have 2
procStore.rollWriterForTesting();
assertEquals(2, procStore.getActiveLogs().size());
// everything will be up to date in the second log
// so we can remove the first one
procStore.update(proc1);
procStore.update(proc2);
assertEquals(1, procStore.getActiveLogs().size());
// roll the log, now we have 2
procStore.rollWriterForTesting();
assertEquals(2, procStore.getActiveLogs().size());
// remove everything active
// so we can remove all the logs
procStore.delete(proc1.getProcId());
procStore.delete(proc2.getProcId());
assertEquals(1, procStore.getActiveLogs().size());
}
private void corruptLog(final FileStatus logFile, final long dropBytes) private void corruptLog(final FileStatus logFile, final long dropBytes)
throws IOException { throws IOException {
assertTrue(logFile.getLen() > dropBytes); assertTrue(logFile.getLen() > dropBytes);