HBASE-21354 Procedure may be deleted improperly during master restarts resulting in 'Corrupt'
This commit is contained in:
parent
ae13b0b293
commit
86f23128b0
|
@ -203,7 +203,7 @@ public class ProcedureStoreTracker {
|
||||||
* then we mark it as deleted.
|
* then we mark it as deleted.
|
||||||
* @see #setDeletedIfModified(long...)
|
* @see #setDeletedIfModified(long...)
|
||||||
*/
|
*/
|
||||||
public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
|
public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, boolean globalTracker) {
|
||||||
BitSetNode trackerNode = null;
|
BitSetNode trackerNode = null;
|
||||||
for (BitSetNode node : map.values()) {
|
for (BitSetNode node : map.values()) {
|
||||||
final long minProcId = node.getStart();
|
final long minProcId = node.getStart();
|
||||||
|
@ -214,9 +214,26 @@ public class ProcedureStoreTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
|
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
|
||||||
if (trackerNode == null || !trackerNode.contains(procId) ||
|
if (trackerNode == null || !trackerNode.contains(procId)) {
|
||||||
trackerNode.isModified(procId)) {
|
// the procId is not exist in the track, we can only delete the proc
|
||||||
// the procedure was removed or modified
|
// if globalTracker set to true.
|
||||||
|
// Only if the procedure is not in the global tracker we can delete the
|
||||||
|
// the procedure. In other cases, the procedure may not update in a single
|
||||||
|
// log, we cannot delete it just because the log's track doesn't have
|
||||||
|
// any info for the procedure.
|
||||||
|
if (globalTracker) {
|
||||||
|
node.delete(procId);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Only check delete in the global tracker, only global tracker has the
|
||||||
|
// whole picture
|
||||||
|
if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) {
|
||||||
|
node.delete(procId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (trackerNode.isModified(procId)) {
|
||||||
|
// the procedure was modified
|
||||||
node.delete(procId);
|
node.delete(procId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
|
||||||
* will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
|
* will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
|
||||||
* method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
|
* method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
|
||||||
* with the tracker of every newer wal files, using the
|
* with the tracker of every newer wal files, using the
|
||||||
* {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out
|
* {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, boolean)}.
|
||||||
|
* If we find out
|
||||||
* that all the modified procedures for the oldest wal file are modified or deleted in newer wal
|
* that all the modified procedures for the oldest wal file are modified or deleted in newer wal
|
||||||
* files, then we can delete it. This is because that, every time we call
|
* files, then we can delete it. This is because that, every time we call
|
||||||
* {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
|
* {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
|
||||||
|
@ -343,7 +344,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the writer
|
// Close the writer
|
||||||
closeCurrentLogStream();
|
closeCurrentLogStream(abort);
|
||||||
|
|
||||||
// Close the old logs
|
// Close the old logs
|
||||||
// they should be already closed, this is just in case the load fails
|
// they should be already closed, this is just in case the load fails
|
||||||
|
@ -398,7 +399,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
public void recoverLease() throws IOException {
|
public void recoverLease() throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
LOG.trace("Starting WAL Procedure Store lease recovery");
|
LOG.debug("Starting WAL Procedure Store lease recovery");
|
||||||
boolean afterFirstAttempt = false;
|
boolean afterFirstAttempt = false;
|
||||||
while (isRunning()) {
|
while (isRunning()) {
|
||||||
// Don't sleep before first attempt
|
// Don't sleep before first attempt
|
||||||
|
@ -433,7 +434,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.trace("Lease acquired for flushLogId={}", flushLogId);
|
LOG.debug("Lease acquired for flushLogId={}", flushLogId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -451,7 +452,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
|
|
||||||
// Nothing to do, If we have only the current log.
|
// Nothing to do, If we have only the current log.
|
||||||
if (logs.size() == 1) {
|
if (logs.size() == 1) {
|
||||||
LOG.trace("No state logs to replay.");
|
LOG.debug("No state logs to replay.");
|
||||||
loader.setMaxProcId(0);
|
loader.setMaxProcId(0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -983,7 +984,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean rollWriterForTesting() throws IOException {
|
public boolean rollWriterForTesting() throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
return rollWriter();
|
return rollWriter();
|
||||||
|
@ -1006,11 +1007,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
if (storeTracker.isEmpty()) {
|
if (storeTracker.isEmpty()) {
|
||||||
LOG.trace("no active procedures");
|
LOG.trace("no active procedures");
|
||||||
tryRollWriter();
|
tryRollWriter();
|
||||||
removeAllLogs(flushLogId - 1);
|
removeAllLogs(flushLogId - 1, "no active procedures");
|
||||||
} else {
|
} else {
|
||||||
if (storeTracker.isAllModified()) {
|
if (storeTracker.isAllModified()) {
|
||||||
LOG.trace("all the active procedures are in the latest log");
|
LOG.trace("all the active procedures are in the latest log");
|
||||||
removeAllLogs(flushLogId - 1);
|
removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the log size has exceeded the roll threshold
|
// if the log size has exceeded the roll threshold
|
||||||
|
@ -1091,7 +1092,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
closeCurrentLogStream();
|
closeCurrentLogStream(false);
|
||||||
|
|
||||||
storeTracker.resetModified();
|
storeTracker.resetModified();
|
||||||
stream = newStream;
|
stream = newStream;
|
||||||
|
@ -1124,7 +1125,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeCurrentLogStream() {
|
private void closeCurrentLogStream(boolean abort) {
|
||||||
if (stream == null || logs.isEmpty()) {
|
if (stream == null || logs.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1133,8 +1134,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
ProcedureWALFile log = logs.getLast();
|
ProcedureWALFile log = logs.getLast();
|
||||||
log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
|
log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
|
||||||
log.updateLocalTracker(storeTracker);
|
log.updateLocalTracker(storeTracker);
|
||||||
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
|
if (!abort) {
|
||||||
log.addToSize(trailerSize);
|
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
|
||||||
|
log.addToSize(trailerSize);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Unable to write the trailer", e);
|
LOG.warn("Unable to write the trailer", e);
|
||||||
}
|
}
|
||||||
|
@ -1153,6 +1156,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
// We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
|
// We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
|
||||||
// once there is nothing olding the oldest WAL we can remove it.
|
// once there is nothing olding the oldest WAL we can remove it.
|
||||||
while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
|
while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
|
||||||
|
LOG.info("Remove the oldest log {}", logs.getFirst());
|
||||||
removeLogFile(logs.getFirst(), walArchiveDir);
|
removeLogFile(logs.getFirst(), walArchiveDir);
|
||||||
buildHoldingCleanupTracker();
|
buildHoldingCleanupTracker();
|
||||||
}
|
}
|
||||||
|
@ -1170,24 +1174,38 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
|
|
||||||
// compute the holding tracker.
|
// compute the holding tracker.
|
||||||
// - the first WAL is used for the 'updates'
|
// - the first WAL is used for the 'updates'
|
||||||
// - the other WALs are scanned to remove procs already in other wals.
|
// - the global tracker is passed in first to decide which procedures are not
|
||||||
|
// exist anymore, so we can mark them as deleted in holdingCleanupTracker.
|
||||||
|
// Only global tracker have the whole picture here.
|
||||||
|
// - the other WALs are scanned to remove procs already updated in a newer wal.
|
||||||
|
// If it is updated in a newer wal, we can mark it as delelted in holdingCleanupTracker
|
||||||
|
// But, we can not delete it if it was shown deleted in the newer wal, as said
|
||||||
|
// above.
|
||||||
// TODO: exit early if holdingCleanupTracker.isEmpty()
|
// TODO: exit early if holdingCleanupTracker.isEmpty()
|
||||||
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
|
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
|
||||||
holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker);
|
//Passing in the global tracker, we can delete the procedures not in the global
|
||||||
|
//tracker, because they are deleted in the later logs
|
||||||
|
holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true);
|
||||||
for (int i = 1, size = logs.size() - 1; i < size; ++i) {
|
for (int i = 1, size = logs.size() - 1; i < size; ++i) {
|
||||||
holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker());
|
// Set deleteIfNotExists to false since a single log's tracker is passed in.
|
||||||
|
// Since a specific procedure may not show up in the log at all(not executed or
|
||||||
|
// updated during the time), we can not delete the procedure just because this log
|
||||||
|
// don't have the info of the procedure. We can delete the procedure only if
|
||||||
|
// in this log's tracker, it was cleanly showed that the procedure is modified or deleted
|
||||||
|
// in the corresponding BitSetNode.
|
||||||
|
holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove all logs with logId <= {@code lastLogId}.
|
* Remove all logs with logId <= {@code lastLogId}.
|
||||||
*/
|
*/
|
||||||
private void removeAllLogs(long lastLogId) {
|
private void removeAllLogs(long lastLogId, String why) {
|
||||||
if (logs.size() <= 1) {
|
if (logs.size() <= 1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.trace("Remove all state logs with ID less than {}", lastLogId);
|
LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why);
|
||||||
|
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
while (logs.size() > 1) {
|
while (logs.size() > 1) {
|
||||||
|
|
|
@ -67,19 +67,37 @@ public class ProcedureTestingUtility {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
|
||||||
|
boolean abort, boolean startWorkers) throws Exception {
|
||||||
|
restart(procExecutor, false, true, null, null, null, abort, startWorkers);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
|
||||||
|
boolean abort) throws Exception {
|
||||||
|
restart(procExecutor, false, true, null, null, null, abort, true);
|
||||||
|
}
|
||||||
|
|
||||||
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
|
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
|
||||||
restart(procExecutor, false, true, null, null, null);
|
restart(procExecutor, false, true, null, null, null, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
|
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
|
||||||
boolean abortOnCorruption) throws IOException {
|
boolean abortOnCorruption) throws IOException {
|
||||||
|
initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
|
||||||
|
boolean abortOnCorruption, boolean startWorkers) throws IOException {
|
||||||
procExecutor.init(numThreads, abortOnCorruption);
|
procExecutor.init(numThreads, abortOnCorruption);
|
||||||
procExecutor.startWorkers();
|
if (startWorkers) {
|
||||||
|
procExecutor.startWorkers();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
|
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
|
||||||
boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
|
boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
|
||||||
Callable<Void> actionBeforeStartWorker, Callable<Void> startAction)
|
Callable<Void> actionBeforeStartWorker, Callable<Void> startAction,
|
||||||
|
boolean abort, boolean startWorkers)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final ProcedureStore procStore = procExecutor.getStore();
|
final ProcedureStore procStore = procExecutor.getStore();
|
||||||
final int storeThreads = procExecutor.getCorePoolSize();
|
final int storeThreads = procExecutor.getCorePoolSize();
|
||||||
|
@ -93,7 +111,7 @@ public class ProcedureTestingUtility {
|
||||||
// stop
|
// stop
|
||||||
LOG.info("RESTART - Stop");
|
LOG.info("RESTART - Stop");
|
||||||
procExecutor.stop();
|
procExecutor.stop();
|
||||||
procStore.stop(false);
|
procStore.stop(abort);
|
||||||
if (stopAction != null) {
|
if (stopAction != null) {
|
||||||
stopAction.call();
|
stopAction.call();
|
||||||
}
|
}
|
||||||
|
@ -109,7 +127,9 @@ public class ProcedureTestingUtility {
|
||||||
if (actionBeforeStartWorker != null) {
|
if (actionBeforeStartWorker != null) {
|
||||||
actionBeforeStartWorker.call();
|
actionBeforeStartWorker.call();
|
||||||
}
|
}
|
||||||
procExecutor.startWorkers();
|
if (startWorkers) {
|
||||||
|
procExecutor.startWorkers();
|
||||||
|
}
|
||||||
if (startAction != null) {
|
if (startAction != null) {
|
||||||
startAction.call();
|
startAction.call();
|
||||||
}
|
}
|
||||||
|
@ -207,7 +227,7 @@ public class ProcedureTestingUtility {
|
||||||
NoopProcedureStore procStore = new NoopProcedureStore();
|
NoopProcedureStore procStore = new NoopProcedureStore();
|
||||||
ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
|
ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
|
||||||
procStore.start(1);
|
procStore.start(1);
|
||||||
initAndStartWorkers(procExecutor, 1, false);
|
initAndStartWorkers(procExecutor, 1, false, true);
|
||||||
try {
|
try {
|
||||||
return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -0,0 +1,242 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
|
public class TestProcedureCleanup {
|
||||||
|
@ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
|
||||||
|
.forClass(TestProcedureCleanup.class);
|
||||||
|
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
|
||||||
|
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
||||||
|
|
||||||
|
private static TestProcEnv procEnv;
|
||||||
|
private static WALProcedureStore procStore;
|
||||||
|
|
||||||
|
private static ProcedureExecutor<TestProcEnv> procExecutor;
|
||||||
|
|
||||||
|
private static HBaseCommonTestingUtility htu;
|
||||||
|
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static Path testDir;
|
||||||
|
private static Path logDir;
|
||||||
|
|
||||||
|
private static class TestProcEnv {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createProcExecutor(String dir) throws Exception {
|
||||||
|
logDir = new Path(testDir, dir);
|
||||||
|
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||||
|
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
|
||||||
|
procStore);
|
||||||
|
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||||
|
ProcedureTestingUtility
|
||||||
|
.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
htu = new HBaseCommonTestingUtility();
|
||||||
|
|
||||||
|
// NOTE: The executor will be created by each test
|
||||||
|
procEnv = new TestProcEnv();
|
||||||
|
testDir = htu.getDataTestDir();
|
||||||
|
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||||
|
assertTrue(testDir.depth() > 1);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcedureShouldNotCleanOnLoad() throws Exception {
|
||||||
|
createProcExecutor("testProcedureShouldNotCleanOnLoad");
|
||||||
|
final RootProcedure proc = new RootProcedure();
|
||||||
|
long rootProc = procExecutor.submitProcedure(proc);
|
||||||
|
LOG.info("Begin to execute " + rootProc);
|
||||||
|
// wait until the child procedure arrival
|
||||||
|
while(procExecutor.getProcedures().size() < 2) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor
|
||||||
|
.getProcedures().get(1);
|
||||||
|
// wait until the suspendProcedure executed
|
||||||
|
suspendProcedure.latch.countDown();
|
||||||
|
Thread.sleep(100);
|
||||||
|
// roll the procedure log
|
||||||
|
LOG.info("Begin to roll log ");
|
||||||
|
procStore.rollWriterForTesting();
|
||||||
|
LOG.info("finish to roll log ");
|
||||||
|
Thread.sleep(500);
|
||||||
|
LOG.info("begin to restart1 ");
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
|
LOG.info("finish to restart1 ");
|
||||||
|
Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
|
||||||
|
Thread.sleep(500);
|
||||||
|
LOG.info("begin to restart2 ");
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
|
LOG.info("finish to restart2 ");
|
||||||
|
Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcedureUpdatedShouldClean() throws Exception {
|
||||||
|
createProcExecutor("testProcedureUpdatedShouldClean");
|
||||||
|
SuspendProcedure suspendProcedure = new SuspendProcedure();
|
||||||
|
long suspendProc = procExecutor.submitProcedure(suspendProcedure);
|
||||||
|
LOG.info("Begin to execute " + suspendProc);
|
||||||
|
suspendProcedure.latch.countDown();
|
||||||
|
Thread.sleep(500);
|
||||||
|
LOG.info("begin to restart1 ");
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
|
LOG.info("finish to restart1 ");
|
||||||
|
while(procExecutor.getProcedure(suspendProc) == null) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
// Wait until the suspendProc executed after restart
|
||||||
|
suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
|
||||||
|
suspendProcedure.latch.countDown();
|
||||||
|
Thread.sleep(500);
|
||||||
|
// Should be 1 log since the suspendProcedure is updated in the new log
|
||||||
|
Assert.assertTrue(procStore.getActiveLogs().size() == 1);
|
||||||
|
// restart procExecutor
|
||||||
|
LOG.info("begin to restart2");
|
||||||
|
// Restart the executor but do not start the workers.
|
||||||
|
// Otherwise, the suspendProcedure will soon be executed and the oldest log
|
||||||
|
// will be cleaned, leaving only the newest log.
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, true, false);
|
||||||
|
LOG.info("finish to restart2");
|
||||||
|
// There should be two active logs
|
||||||
|
Assert.assertTrue(procStore.getActiveLogs().size() == 2);
|
||||||
|
procExecutor.startWorkers();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcedureDeletedShouldClean() throws Exception {
|
||||||
|
createProcExecutor("testProcedureDeletedShouldClean");
|
||||||
|
WaitProcedure waitProcedure = new WaitProcedure();
|
||||||
|
long waitProce = procExecutor.submitProcedure(waitProcedure);
|
||||||
|
LOG.info("Begin to execute " + waitProce);
|
||||||
|
Thread.sleep(500);
|
||||||
|
LOG.info("begin to restart1 ");
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
|
LOG.info("finish to restart1 ");
|
||||||
|
while(procExecutor.getProcedure(waitProce) == null) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
// Wait until the suspendProc executed after restart
|
||||||
|
waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
|
||||||
|
waitProcedure.latch.countDown();
|
||||||
|
Thread.sleep(500);
|
||||||
|
// Should be 1 log since the suspendProcedure is updated in the new log
|
||||||
|
Assert.assertTrue(procStore.getActiveLogs().size() == 1);
|
||||||
|
// restart procExecutor
|
||||||
|
LOG.info("begin to restart2");
|
||||||
|
// Restart the executor but do not start the workers.
|
||||||
|
// Otherwise, the suspendProcedure will soon be executed and the oldest log
|
||||||
|
// will be cleaned, leaving only the newest log.
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, true, false);
|
||||||
|
LOG.info("finish to restart2");
|
||||||
|
// There should be two active logs
|
||||||
|
Assert.assertTrue(procStore.getActiveLogs().size() == 2);
|
||||||
|
procExecutor.startWorkers();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class WaitProcedure
|
||||||
|
extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||||
|
public WaitProcedure() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
private CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure[] execute(final TestProcEnv env)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
// Always wait here
|
||||||
|
LOG.info("wait here");
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
|
||||||
|
}
|
||||||
|
LOG.info("finished");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||||
|
public SuspendProcedure() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
private CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure[] execute(final TestProcEnv env)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
// Always suspend the procedure
|
||||||
|
LOG.info("suspend here");
|
||||||
|
latch.countDown();
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||||
|
private boolean childSpwaned = false;
|
||||||
|
|
||||||
|
public RootProcedure() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure[] execute(final TestProcEnv env)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
if (!childSpwaned) {
|
||||||
|
childSpwaned = true;
|
||||||
|
return new Procedure[] {new SuspendProcedure()};
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue