HBASE-13993 WALProcedureStore fencing is not effective if new WAL rolls
This commit is contained in:
parent
a81b3c5afe
commit
2c076a30e5
|
@ -650,7 +650,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean rollWriter() throws IOException {
|
protected boolean rollWriter() throws IOException {
|
||||||
return rollWriter(flushLogId + 1);
|
// Create new state-log
|
||||||
|
if (!rollWriter(flushLogId + 1)) {
|
||||||
|
LOG.warn("someone else has already created log " + flushLogId);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have the lease on the log,
|
||||||
|
// but we should check if someone else has created new files
|
||||||
|
if (getMaxLogId(getLogFiles()) > flushLogId) {
|
||||||
|
LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
|
||||||
|
logs.getLast().removeFile();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have the lease on the log
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean rollWriter(final long logId) throws IOException {
|
private boolean rollWriter(final long logId) throws IOException {
|
||||||
|
|
|
@ -188,6 +188,10 @@ public class ProcedureTestingUtility {
|
||||||
public static class TestProcedure extends Procedure<Void> {
|
public static class TestProcedure extends Procedure<Void> {
|
||||||
public TestProcedure() {}
|
public TestProcedure() {}
|
||||||
|
|
||||||
|
public TestProcedure(long procId) {
|
||||||
|
this(procId, 0);
|
||||||
|
}
|
||||||
|
|
||||||
public TestProcedure(long procId, long parentId) {
|
public TestProcedure(long procId, long parentId) {
|
||||||
setProcId(procId);
|
setProcId(procId);
|
||||||
if (parentId > 0) {
|
if (parentId > 0) {
|
||||||
|
|
|
@ -35,9 +35,9 @@ import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.wal.TestWALProcedureStore.TestSequentialProcedure;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
|
||||||
|
@ -165,13 +165,32 @@ public class TestMasterFailoverWithProcedures {
|
||||||
backupStore3Abort.await();
|
backupStore3Abort.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
/**
|
||||||
|
* Tests proper fencing in case the current WAL store is fenced
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWALfencingWithoutWALRolling() throws IOException {
|
||||||
|
testWALfencing(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests proper fencing in case the current WAL store does not receive writes until after the
|
||||||
|
* new WAL does a couple of WAL rolls.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
public void testWALfencingWithWALRolling() throws IOException {
|
public void testWALfencingWithWALRolling() throws IOException {
|
||||||
|
testWALfencing(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testWALfencing(boolean walRolls) throws IOException {
|
||||||
final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
|
final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
|
||||||
assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
|
assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
|
||||||
|
|
||||||
HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
|
HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
|
||||||
|
|
||||||
|
// cause WAL rolling after a delete in WAL:
|
||||||
|
firstMaster.getConfiguration().setLong("hbase.procedure.store.wal.roll.threshold", 1);
|
||||||
|
|
||||||
HMaster backupMaster3 = Mockito.mock(HMaster.class);
|
HMaster backupMaster3 = Mockito.mock(HMaster.class);
|
||||||
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
|
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
|
||||||
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
|
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
|
||||||
|
@ -185,20 +204,27 @@ public class TestMasterFailoverWithProcedures {
|
||||||
procStore2.start(1);
|
procStore2.start(1);
|
||||||
procStore2.recoverLease();
|
procStore2.recoverLease();
|
||||||
|
|
||||||
LOG.info("Inserting into second WALProcedureStore");
|
// before writing back to the WAL store, optionally do a couple of WAL rolls (which causes
|
||||||
// insert something to the second store then delete it, causing a WAL roll
|
// to delete the old WAL files).
|
||||||
Procedure proc2 = new TestSequentialProcedure();
|
if (walRolls) {
|
||||||
|
LOG.info("Inserting into second WALProcedureStore, causing WAL rolls");
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
// insert something to the second store then delete it, causing a WAL roll(s)
|
||||||
|
Procedure proc2 = new TestProcedure(i);
|
||||||
procStore2.insert(proc2, null);
|
procStore2.insert(proc2, null);
|
||||||
procStore2.rollWriterOrDie();
|
procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, insert something to the first store, should fail.
|
||||||
|
// If the store does a WAL roll and continue with another logId without checking higher logIds
|
||||||
|
// it will incorrectly succeed.
|
||||||
LOG.info("Inserting into first WALProcedureStore");
|
LOG.info("Inserting into first WALProcedureStore");
|
||||||
// insert something to the first store
|
|
||||||
proc2 = new TestSequentialProcedure();
|
|
||||||
try {
|
try {
|
||||||
procStore.insert(proc2, null);
|
procStore.insert(new TestProcedure(11), null);
|
||||||
fail("expected RuntimeException 'sync aborted'");
|
fail("Inserting into Procedure Store should have failed");
|
||||||
} catch (RuntimeException e) {
|
} catch (Exception ex) {
|
||||||
LOG.info("got " + e.getMessage());
|
LOG.info("Received expected exception", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,7 @@ public class TestWALProcedureStoreOnHDFS {
|
||||||
UTIL.getDFSCluster().restartDataNode(dnCount);
|
UTIL.getDFSCluster().restartDataNode(dnCount);
|
||||||
for (long i = 2; i < 100; ++i) {
|
for (long i = 2; i < 100; ++i) {
|
||||||
store.insert(new TestProcedure(i, -1), null);
|
store.insert(new TestProcedure(i, -1), null);
|
||||||
|
waitForNumReplicas(3);
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
if ((i % 30) == 0) {
|
if ((i % 30) == 0) {
|
||||||
LOG.info("Restart Data Node");
|
LOG.info("Restart Data Node");
|
||||||
|
@ -195,4 +196,18 @@ public class TestWALProcedureStoreOnHDFS {
|
||||||
}
|
}
|
||||||
assertTrue(store.isRunning());
|
assertTrue(store.isRunning());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void waitForNumReplicas(int numReplicas) throws Exception {
|
||||||
|
while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numReplicas; ++i) {
|
||||||
|
for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) {
|
||||||
|
while (!dn.isDatanodeFullyStarted()) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue