HBASE-21363 Rewrite the buildingHoldCleanupTracker method in WALProcedureStore
This commit is contained in:
parent
1f437ac221
commit
b2fcf765ae
|
@ -131,9 +131,11 @@ class BitSetNode {
|
||||||
|
|
||||||
public BitSetNode(BitSetNode other, boolean resetDelete) {
|
public BitSetNode(BitSetNode other, boolean resetDelete) {
|
||||||
this.start = other.start;
|
this.start = other.start;
|
||||||
this.partial = other.partial;
|
|
||||||
this.modified = other.modified.clone();
|
|
||||||
// The resetDelete will be set to true when building cleanup tracker.
|
// The resetDelete will be set to true when building cleanup tracker.
|
||||||
|
// as we will reset deleted flags for all the unmodified bits to 1, the partial flag is useless
|
||||||
|
// so set it to false for not confusing the developers when debugging.
|
||||||
|
this.partial = resetDelete ? false : other.partial;
|
||||||
|
this.modified = other.modified.clone();
|
||||||
// The intention here is that, if a procedure is not modified in this tracker, then we do not
|
// The intention here is that, if a procedure is not modified in this tracker, then we do not
|
||||||
// need to take care of it, so we will set deleted to true for these bits, i.e, if modified is
|
// need to take care of it, so we will set deleted to true for these bits, i.e, if modified is
|
||||||
// 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is
|
// 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
import java.util.stream.LongStream;
|
import java.util.stream.LongStream;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -87,7 +88,10 @@ public class ProcedureStoreTracker {
|
||||||
*/
|
*/
|
||||||
public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
|
public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
|
||||||
reset();
|
reset();
|
||||||
this.partial = tracker.partial;
|
// resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags
|
||||||
|
// for all the unmodified bits to 1, the partial flag is useless so set it to false for not
|
||||||
|
// confusing the developers when debugging.
|
||||||
|
this.partial = resetDelete ? false : tracker.partial;
|
||||||
this.minModifiedProcId = tracker.minModifiedProcId;
|
this.minModifiedProcId = tracker.minModifiedProcId;
|
||||||
this.maxModifiedProcId = tracker.maxModifiedProcId;
|
this.maxModifiedProcId = tracker.maxModifiedProcId;
|
||||||
this.keepDeletes = tracker.keepDeletes;
|
this.keepDeletes = tracker.keepDeletes;
|
||||||
|
@ -197,49 +201,45 @@ public class ProcedureStoreTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private void setDeleteIf(ProcedureStoreTracker tracker,
|
||||||
* Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
|
BiFunction<BitSetNode, Long, Boolean> func) {
|
||||||
* the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
|
|
||||||
* then we mark it as deleted.
|
|
||||||
* @see #setDeletedIfModified(long...)
|
|
||||||
*/
|
|
||||||
public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, boolean globalTracker) {
|
|
||||||
BitSetNode trackerNode = null;
|
BitSetNode trackerNode = null;
|
||||||
for (BitSetNode node : map.values()) {
|
for (BitSetNode node : map.values()) {
|
||||||
final long minProcId = node.getStart();
|
long minProcId = node.getStart();
|
||||||
final long maxProcId = node.getEnd();
|
long maxProcId = node.getEnd();
|
||||||
for (long procId = minProcId; procId <= maxProcId; ++procId) {
|
for (long procId = minProcId; procId <= maxProcId; ++procId) {
|
||||||
if (!node.isModified(procId)) {
|
if (!node.isModified(procId)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
|
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
|
||||||
if (trackerNode == null || !trackerNode.contains(procId)) {
|
if (func.apply(trackerNode, procId)) {
|
||||||
// the procId is not exist in the track, we can only delete the proc
|
|
||||||
// if globalTracker set to true.
|
|
||||||
// Only if the procedure is not in the global tracker we can delete the
|
|
||||||
// the procedure. In other cases, the procedure may not update in a single
|
|
||||||
// log, we cannot delete it just because the log's track doesn't have
|
|
||||||
// any info for the procedure.
|
|
||||||
if (globalTracker) {
|
|
||||||
node.delete(procId);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Only check delete in the global tracker, only global tracker has the
|
|
||||||
// whole picture
|
|
||||||
if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) {
|
|
||||||
node.delete(procId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (trackerNode.isModified(procId)) {
|
|
||||||
// the procedure was modified
|
|
||||||
node.delete(procId);
|
node.delete(procId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For the global tracker, we will use this method to build the holdingCleanupTracker, as the
|
||||||
|
* modified flags will be cleared after rolling so we only need to test the deleted flags.
|
||||||
|
* @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
|
||||||
|
*/
|
||||||
|
public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
|
||||||
|
setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) ||
|
||||||
|
node.isDeleted(procId) == DeleteState.YES);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
|
||||||
|
* the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
|
||||||
|
* then we mark it as deleted.
|
||||||
|
* @see #setDeletedIfModified(long...)
|
||||||
|
*/
|
||||||
|
public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
|
||||||
|
setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* lookup the node containing the specified procId.
|
* lookup the node containing the specified procId.
|
||||||
* @param node cached node to check before doing a lookup
|
* @param node cached node to check before doing a lookup
|
||||||
|
|
|
@ -73,6 +73,17 @@ public final class ProcedureWALFormat {
|
||||||
|
|
||||||
private ProcedureWALFormat() {}
|
private ProcedureWALFormat() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if
|
||||||
|
* needed, i.e, the {@code tracker} is a partial one.
|
||||||
|
* <p/>
|
||||||
|
* The method in the give {@code loader} will be called at the end after we load all the
|
||||||
|
* procedures and construct the hierarchy.
|
||||||
|
* <p/>
|
||||||
|
* And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given
|
||||||
|
* {@code tracker} before returning, as it will be used to track the next proc wal file's modified
|
||||||
|
* procedures.
|
||||||
|
*/
|
||||||
public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
|
public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
|
||||||
Loader loader) throws IOException {
|
Loader loader) throws IOException {
|
||||||
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
|
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
|
||||||
|
|
|
@ -134,9 +134,8 @@ public class ProcedureWALFormatReader {
|
||||||
}
|
}
|
||||||
procedureMap.merge(localProcedureMap);
|
procedureMap.merge(localProcedureMap);
|
||||||
}
|
}
|
||||||
if (localTracker.isPartial()) {
|
// Do not reset the partial flag for local tracker, as here the local tracker only know the
|
||||||
localTracker.setPartialFlag(false);
|
// procedures which are modified in this file.
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void finish() throws IOException {
|
public void finish() throws IOException {
|
||||||
|
|
|
@ -97,7 +97,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
|
||||||
* will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
|
* will first be initialized to the oldest file's tracker(which is stored in the trailer), using the
|
||||||
* method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
|
* method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it
|
||||||
* with the tracker of every newer wal files, using the
|
* with the tracker of every newer wal files, using the
|
||||||
* {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, boolean)}.
|
* {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}.
|
||||||
* If we find out
|
* If we find out
|
||||||
* that all the modified procedures for the oldest wal file are modified or deleted in newer wal
|
* that all the modified procedures for the oldest wal file are modified or deleted in newer wal
|
||||||
* files, then we can delete it. This is because that, every time we call
|
* files, then we can delete it. This is because that, every time we call
|
||||||
|
@ -1174,26 +1174,25 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
|
|
||||||
// compute the holding tracker.
|
// compute the holding tracker.
|
||||||
// - the first WAL is used for the 'updates'
|
// - the first WAL is used for the 'updates'
|
||||||
// - the global tracker is passed in first to decide which procedures are not
|
// - the global tracker will be used to determine whether a procedure has been deleted
|
||||||
// exist anymore, so we can mark them as deleted in holdingCleanupTracker.
|
// - other trackers will be used to determine whether a procedure has been updated, as a deleted
|
||||||
// Only global tracker have the whole picture here.
|
// procedure can always be detected by checking the global tracker, we can save the deleted
|
||||||
// - the other WALs are scanned to remove procs already updated in a newer wal.
|
// checks when applying other trackers
|
||||||
// If it is updated in a newer wal, we can mark it as delelted in holdingCleanupTracker
|
|
||||||
// But, we can not delete it if it was shown deleted in the newer wal, as said
|
|
||||||
// above.
|
|
||||||
// TODO: exit early if holdingCleanupTracker.isEmpty()
|
|
||||||
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
|
holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
|
||||||
//Passing in the global tracker, we can delete the procedures not in the global
|
holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
|
||||||
//tracker, because they are deleted in the later logs
|
// the logs is a linked list, so avoid calling get(index) on it.
|
||||||
holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true);
|
Iterator<ProcedureWALFile> iter = logs.iterator();
|
||||||
for (int i = 1, size = logs.size() - 1; i < size; ++i) {
|
// skip the tracker for the first file when creating the iterator.
|
||||||
// Set deleteIfNotExists to false since a single log's tracker is passed in.
|
iter.next();
|
||||||
// Since a specific procedure may not show up in the log at all(not executed or
|
ProcedureStoreTracker tracker = iter.next().getTracker();
|
||||||
// updated during the time), we can not delete the procedure just because this log
|
// testing iter.hasNext after calling iter.next to skip applying the tracker for last file,
|
||||||
// don't have the info of the procedure. We can delete the procedure only if
|
// which is just the storeTracker above.
|
||||||
// in this log's tracker, it was cleanly showed that the procedure is modified or deleted
|
while (iter.hasNext()) {
|
||||||
// in the corresponding BitSetNode.
|
holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
|
||||||
holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), false);
|
if (holdingCleanupTracker.isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
iter.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,13 @@ package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Exchanger;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -28,28 +33,32 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
||||||
|
|
||||||
@Category({MasterTests.class, SmallTests.class})
|
@Category({ MasterTests.class, SmallTests.class })
|
||||||
public class TestProcedureCleanup {
|
public class TestProcedureCleanup {
|
||||||
@ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
|
|
||||||
.forClass(TestProcedureCleanup.class);
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestProcedureCleanup.class);
|
||||||
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureCleanup.class);
|
||||||
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
|
|
||||||
|
|
||||||
private static TestProcEnv procEnv;
|
private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
|
||||||
|
|
||||||
private static WALProcedureStore procStore;
|
private static WALProcedureStore procStore;
|
||||||
|
|
||||||
private static ProcedureExecutor<TestProcEnv> procExecutor;
|
private static ProcedureExecutor<Void> procExecutor;
|
||||||
|
|
||||||
private static HBaseCommonTestingUtility htu;
|
private static HBaseCommonTestingUtility htu;
|
||||||
|
|
||||||
|
@ -57,43 +66,35 @@ public class TestProcedureCleanup {
|
||||||
private static Path testDir;
|
private static Path testDir;
|
||||||
private static Path logDir;
|
private static Path logDir;
|
||||||
|
|
||||||
private static class TestProcEnv {
|
@Rule
|
||||||
|
public final TestName name = new TestName();
|
||||||
|
|
||||||
}
|
private void createProcExecutor() throws Exception {
|
||||||
|
logDir = new Path(testDir, name.getMethodName());
|
||||||
private void createProcExecutor(String dir) throws Exception {
|
|
||||||
logDir = new Path(testDir, dir);
|
|
||||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||||
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
|
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
|
||||||
procStore);
|
|
||||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||||
ProcedureTestingUtility
|
ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
|
||||||
.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
htu = new HBaseCommonTestingUtility();
|
htu = new HBaseCommonTestingUtility();
|
||||||
|
htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
|
||||||
// NOTE: The executor will be created by each test
|
// NOTE: The executor will be created by each test
|
||||||
procEnv = new TestProcEnv();
|
|
||||||
testDir = htu.getDataTestDir();
|
testDir = htu.getDataTestDir();
|
||||||
fs = testDir.getFileSystem(htu.getConfiguration());
|
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||||
assertTrue(testDir.depth() > 1);
|
assertTrue(testDir.depth() > 1);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProcedureShouldNotCleanOnLoad() throws Exception {
|
public void testProcedureShouldNotCleanOnLoad() throws Exception {
|
||||||
createProcExecutor("testProcedureShouldNotCleanOnLoad");
|
createProcExecutor();
|
||||||
final RootProcedure proc = new RootProcedure();
|
final RootProcedure proc = new RootProcedure();
|
||||||
long rootProc = procExecutor.submitProcedure(proc);
|
long rootProc = procExecutor.submitProcedure(proc);
|
||||||
LOG.info("Begin to execute " + rootProc);
|
LOG.info("Begin to execute " + rootProc);
|
||||||
// wait until the child procedure arrival
|
// wait until the child procedure arrival
|
||||||
while(procExecutor.getProcedures().size() < 2) {
|
htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2);
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor
|
SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor
|
||||||
.getProcedures().get(1);
|
.getProcedures().get(1);
|
||||||
// wait until the suspendProcedure executed
|
// wait until the suspendProcedure executed
|
||||||
|
@ -107,17 +108,17 @@ public class TestProcedureCleanup {
|
||||||
LOG.info("begin to restart1 ");
|
LOG.info("begin to restart1 ");
|
||||||
ProcedureTestingUtility.restart(procExecutor, true);
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
LOG.info("finish to restart1 ");
|
LOG.info("finish to restart1 ");
|
||||||
Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
|
assertTrue(procExecutor.getProcedure(rootProc) != null);
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
LOG.info("begin to restart2 ");
|
LOG.info("begin to restart2 ");
|
||||||
ProcedureTestingUtility.restart(procExecutor, true);
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
LOG.info("finish to restart2 ");
|
LOG.info("finish to restart2 ");
|
||||||
Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
|
assertTrue(procExecutor.getProcedure(rootProc) != null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProcedureUpdatedShouldClean() throws Exception {
|
public void testProcedureUpdatedShouldClean() throws Exception {
|
||||||
createProcExecutor("testProcedureUpdatedShouldClean");
|
createProcExecutor();
|
||||||
SuspendProcedure suspendProcedure = new SuspendProcedure();
|
SuspendProcedure suspendProcedure = new SuspendProcedure();
|
||||||
long suspendProc = procExecutor.submitProcedure(suspendProcedure);
|
long suspendProc = procExecutor.submitProcedure(suspendProcedure);
|
||||||
LOG.info("Begin to execute " + suspendProc);
|
LOG.info("Begin to execute " + suspendProc);
|
||||||
|
@ -126,15 +127,13 @@ public class TestProcedureCleanup {
|
||||||
LOG.info("begin to restart1 ");
|
LOG.info("begin to restart1 ");
|
||||||
ProcedureTestingUtility.restart(procExecutor, true);
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
LOG.info("finish to restart1 ");
|
LOG.info("finish to restart1 ");
|
||||||
while(procExecutor.getProcedure(suspendProc) == null) {
|
htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null);
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
// Wait until the suspendProc executed after restart
|
// Wait until the suspendProc executed after restart
|
||||||
suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
|
suspendProcedure = (SuspendProcedure) procExecutor.getProcedure(suspendProc);
|
||||||
suspendProcedure.latch.countDown();
|
suspendProcedure.latch.countDown();
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
// Should be 1 log since the suspendProcedure is updated in the new log
|
// Should be 1 log since the suspendProcedure is updated in the new log
|
||||||
Assert.assertTrue(procStore.getActiveLogs().size() == 1);
|
assertTrue(procStore.getActiveLogs().size() == 1);
|
||||||
// restart procExecutor
|
// restart procExecutor
|
||||||
LOG.info("begin to restart2");
|
LOG.info("begin to restart2");
|
||||||
// Restart the executor but do not start the workers.
|
// Restart the executor but do not start the workers.
|
||||||
|
@ -143,14 +142,14 @@ public class TestProcedureCleanup {
|
||||||
ProcedureTestingUtility.restart(procExecutor, true, false);
|
ProcedureTestingUtility.restart(procExecutor, true, false);
|
||||||
LOG.info("finish to restart2");
|
LOG.info("finish to restart2");
|
||||||
// There should be two active logs
|
// There should be two active logs
|
||||||
Assert.assertTrue(procStore.getActiveLogs().size() == 2);
|
assertTrue(procStore.getActiveLogs().size() == 2);
|
||||||
procExecutor.startWorkers();
|
procExecutor.startWorkers();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProcedureDeletedShouldClean() throws Exception {
|
public void testProcedureDeletedShouldClean() throws Exception {
|
||||||
createProcExecutor("testProcedureDeletedShouldClean");
|
createProcExecutor();
|
||||||
WaitProcedure waitProcedure = new WaitProcedure();
|
WaitProcedure waitProcedure = new WaitProcedure();
|
||||||
long waitProce = procExecutor.submitProcedure(waitProcedure);
|
long waitProce = procExecutor.submitProcedure(waitProcedure);
|
||||||
LOG.info("Begin to execute " + waitProce);
|
LOG.info("Begin to execute " + waitProce);
|
||||||
|
@ -158,15 +157,13 @@ public class TestProcedureCleanup {
|
||||||
LOG.info("begin to restart1 ");
|
LOG.info("begin to restart1 ");
|
||||||
ProcedureTestingUtility.restart(procExecutor, true);
|
ProcedureTestingUtility.restart(procExecutor, true);
|
||||||
LOG.info("finish to restart1 ");
|
LOG.info("finish to restart1 ");
|
||||||
while(procExecutor.getProcedure(waitProce) == null) {
|
htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null);
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
// Wait until the suspendProc executed after restart
|
// Wait until the suspendProc executed after restart
|
||||||
waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
|
waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
|
||||||
waitProcedure.latch.countDown();
|
waitProcedure.latch.countDown();
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
// Should be 1 log since the suspendProcedure is updated in the new log
|
// Should be 1 log since the suspendProcedure is updated in the new log
|
||||||
Assert.assertTrue(procStore.getActiveLogs().size() == 1);
|
assertTrue(procStore.getActiveLogs().size() == 1);
|
||||||
// restart procExecutor
|
// restart procExecutor
|
||||||
LOG.info("begin to restart2");
|
LOG.info("begin to restart2");
|
||||||
// Restart the executor but do not start the workers.
|
// Restart the executor but do not start the workers.
|
||||||
|
@ -175,12 +172,64 @@ public class TestProcedureCleanup {
|
||||||
ProcedureTestingUtility.restart(procExecutor, true, false);
|
ProcedureTestingUtility.restart(procExecutor, true, false);
|
||||||
LOG.info("finish to restart2");
|
LOG.info("finish to restart2");
|
||||||
// There should be two active logs
|
// There should be two active logs
|
||||||
Assert.assertTrue(procStore.getActiveLogs().size() == 2);
|
assertTrue(procStore.getActiveLogs().size() == 2);
|
||||||
procExecutor.startWorkers();
|
procExecutor.startWorkers();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class WaitProcedure
|
private void corrupt(FileStatus file) throws IOException {
|
||||||
extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
LOG.info("Corrupt " + file);
|
||||||
|
Path tmpFile = file.getPath().suffix(".tmp");
|
||||||
|
// remove the last byte to make the trailer corrupted
|
||||||
|
try (FSDataInputStream in = fs.open(file.getPath());
|
||||||
|
FSDataOutputStream out = fs.create(tmpFile)) {
|
||||||
|
ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out);
|
||||||
|
}
|
||||||
|
fs.delete(file.getPath(), false);
|
||||||
|
fs.rename(tmpFile, file.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static final class ExchangeProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
|
||||||
|
|
||||||
|
private final Exchanger<Boolean> exchanger = new Exchanger<>();
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
protected Procedure<Void>[] execute(Void env)
|
||||||
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
if (exchanger.exchange(Boolean.TRUE)) {
|
||||||
|
return new Procedure[] { this };
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws Exception {
|
||||||
|
createProcExecutor();
|
||||||
|
ExchangeProcedure proc1 = new ExchangeProcedure();
|
||||||
|
ExchangeProcedure proc2 = new ExchangeProcedure();
|
||||||
|
procExecutor.submitProcedure(proc1);
|
||||||
|
long procId2 = procExecutor.submitProcedure(proc2);
|
||||||
|
Thread.sleep(500);
|
||||||
|
procStore.rollWriterForTesting();
|
||||||
|
proc1.exchanger.exchange(Boolean.TRUE);
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
FileStatus[] walFiles = fs.listStatus(logDir);
|
||||||
|
Arrays.sort(walFiles, (f1, f2) -> f1.getPath().getName().compareTo(f2.getPath().getName()));
|
||||||
|
// corrupt the first proc wal file, so we will have a partial tracker for it after restarting
|
||||||
|
corrupt(walFiles[0]);
|
||||||
|
ProcedureTestingUtility.restart(procExecutor, false, true);
|
||||||
|
// also update proc2, which means that all the procedures in the first proc wal have been
|
||||||
|
// updated and it should be deleted.
|
||||||
|
proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2);
|
||||||
|
proc2.exchanger.exchange(Boolean.TRUE);
|
||||||
|
htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class WaitProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
|
||||||
public WaitProcedure() {
|
public WaitProcedure() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
@ -188,8 +237,7 @@ public class TestProcedureCleanup {
|
||||||
private CountDownLatch latch = new CountDownLatch(1);
|
private CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(final TestProcEnv env)
|
protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
|
||||||
throws ProcedureSuspendedException {
|
|
||||||
// Always wait here
|
// Always wait here
|
||||||
LOG.info("wait here");
|
LOG.info("wait here");
|
||||||
try {
|
try {
|
||||||
|
@ -202,7 +250,7 @@ public class TestProcedureCleanup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
|
||||||
public SuspendProcedure() {
|
public SuspendProcedure() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
@ -210,8 +258,7 @@ public class TestProcedureCleanup {
|
||||||
private CountDownLatch latch = new CountDownLatch(1);
|
private CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(final TestProcEnv env)
|
protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
|
||||||
throws ProcedureSuspendedException {
|
|
||||||
// Always suspend the procedure
|
// Always suspend the procedure
|
||||||
LOG.info("suspend here");
|
LOG.info("suspend here");
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -219,7 +266,7 @@ public class TestProcedureCleanup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<Void> {
|
||||||
private boolean childSpwaned = false;
|
private boolean childSpwaned = false;
|
||||||
|
|
||||||
public RootProcedure() {
|
public RootProcedure() {
|
||||||
|
@ -227,16 +274,13 @@ public class TestProcedureCleanup {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(final TestProcEnv env)
|
protected Procedure<Void>[] execute(Void env) throws ProcedureSuspendedException {
|
||||||
throws ProcedureSuspendedException {
|
|
||||||
if (!childSpwaned) {
|
if (!childSpwaned) {
|
||||||
childSpwaned = true;
|
childSpwaned = true;
|
||||||
return new Procedure[] {new SuspendProcedure()};
|
return new Procedure[] { new SuspendProcedure() };
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue