HBASE-21490 WALProcedure may remove proc wal files still with active procedures

Signed-off-by: Allan Yang <allan163@apache.org>
This commit is contained in:
Duo Zhang 2018-11-19 11:03:52 +08:00 committed by stack
parent 83dc38a1df
commit 405bf5e638
4 changed files with 193 additions and 35 deletions

View File

@ -274,7 +274,8 @@ public class ProcedureStoreTracker {
this.keepDeletes = false; this.keepDeletes = false;
this.partial = false; this.partial = false;
this.map.clear(); this.map.clear();
resetModified(); minModifiedProcId = Long.MAX_VALUE;
maxModifiedProcId = Long.MIN_VALUE;
} }
public boolean isModified(long procId) { public boolean isModified(long procId) {

View File

@ -88,27 +88,24 @@ public final class ProcedureWALFormat {
Loader loader) throws IOException { Loader loader) throws IOException {
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
tracker.setKeepDeletes(true); tracker.setKeepDeletes(true);
try { // Ignore the last log which is current active log.
// Ignore the last log which is current active log. while (logs.hasNext()) {
while (logs.hasNext()) { ProcedureWALFile log = logs.next();
ProcedureWALFile log = logs.next(); log.open();
log.open(); try {
try { reader.read(log);
reader.read(log); } finally {
} finally { log.close();
log.close();
}
} }
reader.finish();
// The tracker is now updated with all the procedures read from the logs
if (tracker.isPartial()) {
tracker.setPartialFlag(false);
}
tracker.resetModified();
} finally {
tracker.setKeepDeletes(false);
} }
reader.finish();
// The tracker is now updated with all the procedures read from the logs
if (tracker.isPartial()) {
tracker.setPartialFlag(false);
}
tracker.resetModified();
tracker.setKeepDeletes(false);
} }
public static void writeHeader(OutputStream stream, ProcedureWALHeader header) public static void writeHeader(OutputStream stream, ProcedureWALHeader header)

View File

@ -448,13 +448,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
lock.lock(); lock.lock();
try { try {
if (logs.isEmpty()) { if (logs.isEmpty()) {
throw new RuntimeException("recoverLease() must be called before loading data"); throw new IllegalStateException("recoverLease() must be called before loading data");
} }
// Nothing to do, If we have only the current log. // Nothing to do, If we have only the current log.
if (logs.size() == 1) { if (logs.size() == 1) {
LOG.debug("No state logs to replay."); LOG.debug("No state logs to replay.");
loader.setMaxProcId(0); loader.setMaxProcId(0);
loading.set(false);
return; return;
} }
@ -488,15 +489,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
// TODO: sideline corrupted log // TODO: sideline corrupted log
} }
}); });
// if we fail when loading, we should prevent persisting the storeTracker later in the stop
// method. As it may happen that, we have finished constructing the modified and deleted bits,
// but before we call resetModified, we fail, then if we persist the storeTracker then when
// restarting, we will consider that all procedures have been included in this file and delete
// all the previous files. Obviously this not correct. So here we will only set loading to
// false when we successfully loaded all the procedures, and when closing we will skip
// persisting the store tracker. And also, this will prevent the sync thread to do
// periodicRoll, where we may also clean old logs.
loading.set(false);
// try to cleanup inactive wals and complete the operation
buildHoldingCleanupTracker();
tryCleanupLogsOnLoad();
} finally { } finally {
try { lock.unlock();
// try to cleanup inactive wals and complete the operation
buildHoldingCleanupTracker();
tryCleanupLogsOnLoad();
loading.set(false);
} finally {
lock.unlock();
}
} }
} }
@ -1133,11 +1139,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
try { try {
ProcedureWALFile log = logs.getLast(); ProcedureWALFile log = logs.getLast();
log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); // If the loading flag is true, it usually means that we fail when loading procedures, so we
log.updateLocalTracker(storeTracker); // should not persist the store tracker, as its state may not be correct.
if (!abort) { if (!loading.get()) {
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
log.addToSize(trailerSize); log.updateLocalTracker(storeTracker);
if (!abort) {
long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
log.addToSize(trailerSize);
}
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to write the trailer", e); LOG.warn("Unable to write the trailer", e);
@ -1193,7 +1203,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (holdingCleanupTracker.isEmpty()) { if (holdingCleanupTracker.isEmpty()) {
break; break;
} }
iter.next(); tracker = iter.next().getTracker();
} }
} }

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertFalse;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* Testcase for HBASE-21490.
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestLoadProcedureError {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLoadProcedureError.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName NAME = TableName.valueOf("Load");
private static volatile CountDownLatch ARRIVE;
private static volatile boolean FINISH_PROC;
private static volatile boolean FAIL_LOAD;
public static final class TestProcedure extends NoopProcedure<MasterProcedureEnv>
implements TableProcedureInterface {
@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (ARRIVE != null) {
ARRIVE.countDown();
ARRIVE = null;
}
if (FINISH_PROC) {
return null;
}
setTimeout(1000);
setState(ProcedureState.WAITING_TIMEOUT);
throw new ProcedureSuspendedException();
}
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureState.RUNNABLE);
env.getProcedureScheduler().addBack(this);
return false;
}
@Override
protected void afterReplay(MasterProcedureEnv env) {
if (FAIL_LOAD) {
throw new RuntimeException("Inject error");
}
}
@Override
public TableName getTableName() {
return NAME;
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.READ;
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
private void waitNoMaster() {
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getLiveMasterThreads().isEmpty());
}
@Test
public void testLoadError() throws Exception {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
ARRIVE = new CountDownLatch(1);
long procId = procExec.submitProcedure(new TestProcedure());
ARRIVE.await();
FAIL_LOAD = true;
// do not persist the store tracker
UTIL.getMiniHBaseCluster().getMaster().getWalProcedureStore().stop(true);
UTIL.getMiniHBaseCluster().getMaster().abort("for testing");
waitNoMaster();
// restart twice, and should fail twice, as we will throw an exception in the afterReplay above
// in order to reproduce the problem in HBASE-21490 stably, here we will wait until a master is
// fully done, before starting the new master, otherwise the new master may start too early and
// call recoverLease on the proc wal files and cause we fail to persist the store tracker when
// shutting down
UTIL.getMiniHBaseCluster().startMaster();
waitNoMaster();
UTIL.getMiniHBaseCluster().startMaster();
waitNoMaster();
FAIL_LOAD = false;
HMaster master = UTIL.getMiniHBaseCluster().startMaster().getMaster();
UTIL.waitFor(30000, () -> master.isActiveMaster() && master.isInitialized());
// assert the procedure is still there and not finished yet
TestProcedure proc = (TestProcedure) master.getMasterProcedureExecutor().getProcedure(procId);
assertFalse(proc.isFinished());
FINISH_PROC = true;
UTIL.waitFor(30000, () -> proc.isFinished());
}
}