HBASE-21254 Need to find a way to limit the number of proc wal files

This commit is contained in:
Duo Zhang 2018-10-11 15:43:11 +08:00
parent da63ebb2c0
commit 9e9a1e0f0d
11 changed files with 382 additions and 103 deletions

View File

@ -31,6 +31,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -56,6 +59,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
@ -347,6 +351,9 @@ public class ProcedureExecutor<TEnvironment> {
*/
private final ProcedureScheduler scheduler;
private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
private final AtomicLong lastProcId = new AtomicLong(-1);
private final AtomicLong workerId = new AtomicLong(0);
private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
@ -369,6 +376,25 @@ public class ProcedureExecutor<TEnvironment> {
this(conf, environment, store, new SimpleProcedureScheduler());
}
private void forceUpdateProcedure(long procId) throws IOException {
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
try {
Procedure<TEnvironment> proc = procedures.get(procId);
if (proc == null) {
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
return;
}
if (proc.isFinished()) {
LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
return;
}
LOG.debug("Force update procedure {}", proc);
store.update(proc);
} finally {
procExecutionLock.releaseLockEntry(lockEntry);
}
}
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store, final ProcedureScheduler scheduler) {
this.environment = environment;
@ -377,7 +403,19 @@ public class ProcedureExecutor<TEnvironment> {
this.conf = conf;
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
refreshConfiguration(conf);
store.registerListener(new ProcedureStoreListener() {
@Override
public void forceUpdate(long[] procIds) {
Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
try {
forceUpdateProcedure(procId);
} catch (IOException e) {
LOG.warn("Failed to force update procedure with pid={}", procId);
}
}));
}
});
}
private void load(final boolean abortOnCorruption) throws IOException {
@ -1057,7 +1095,7 @@ public class ProcedureExecutor<TEnvironment> {
// Now, the procedure is not finished, and no one can execute it since we take the lock now
// And we can be sure that its ancestor is not running too, since their child has not
// finished yet
Procedure current = procedure;
Procedure<TEnvironment> current = procedure;
while (current != null) {
LOG.debug("Bypassing {}", current);
current.bypass();
@ -1988,7 +2026,6 @@ public class ProcedureExecutor<TEnvironment> {
public void sendStopSignal() {
scheduler.signalAll();
}
@Override
public void run() {
long lastUpdate = EnvironmentEdgeManager.currentTime();

View File

@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
import org.apache.yetus.audience.InterfaceAudience;
@ -196,6 +198,27 @@ class BitSetNode {
return true;
}
/**
* @return all the active procedure ids in this bit set.
*/
public long[] getActiveProcIds() {
List<Long> procIds = new ArrayList<>();
for (int wordIndex = 0; wordIndex < modified.length; wordIndex++) {
if (deleted[wordIndex] == WORD_MASK || modified[wordIndex] == 0) {
// This should be the common case, where most procedures has been deleted.
continue;
}
long baseProcId = getStart() + (wordIndex << ADDRESS_BITS_PER_WORD);
for (int i = 0; i < (1 << ADDRESS_BITS_PER_WORD); i++) {
long mask = 1L << i;
if ((deleted[wordIndex] & mask) == 0 && (modified[wordIndex] & mask) != 0) {
procIds.add(baseProcId + i);
}
}
}
return procIds.stream().mapToLong(Long::longValue).toArray();
}
/**
* @return true, if there are no active procedures in this BitSetNode, else false.
*/

View File

@ -34,19 +34,37 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
public interface ProcedureStore {
/**
* Store listener interface.
* <p/>
* The main process should register a listener and respond to the store events.
*/
public interface ProcedureStoreListener {
/**
* triggered when the store sync is completed.
*/
void postSync();
default void postSync() {
}
/**
* triggered when the store is not able to write out data.
* the main process should abort.
* triggered when the store is not able to write out data. the main process should abort.
*/
void abortProcess();
default void abortProcess() {
}
/**
* Suggest that the upper layer should update the state of some procedures. Ignore this call
* will not effect correctness but performance.
* <p/>
* For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file
* have been deleted, or updated later in another WAL file, then we can delete the WAL file. If
* there are old procedures in a WAL file which are never deleted or updated, then we can not
* delete the WAL file and this will cause we hold lots of WAL file and slow down the master
* restarts. So here we introduce this method to tell the upper layer that please update the
* states of these procedures so that we can delete the old WAL file.
* @param procIds the id for the procedures
*/
default void forceUpdate(long[] procIds) {
}
}
/**

View File

@ -15,12 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -58,19 +56,15 @@ public abstract class ProcedureStoreBase implements ProcedureStore {
return listeners.remove(listener);
}
protected void sendPostSyncSignal() {
if (!this.listeners.isEmpty()) {
for (ProcedureStoreListener listener : this.listeners) {
listener.postSync();
}
}
protected final void sendPostSyncSignal() {
listeners.forEach(ProcedureStoreListener::postSync);
}
protected void sendAbortProcessSignal() {
if (!this.listeners.isEmpty()) {
for (ProcedureStoreListener listener : this.listeners) {
listener.abortProcess();
}
}
protected final void sendAbortProcessSignal() {
listeners.forEach(ProcedureStoreListener::abortProcess);
}
protected final void sendForceUpdateSignal(long[] procIds) {
listeners.forEach(l -> l.forceUpdate(procIds));
}
}

View File

@ -23,9 +23,10 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.LongStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
@ -335,6 +336,16 @@ public class ProcedureStoreTracker {
return true;
}
/**
* Will be used when there are too many proc wal files. We will rewrite the states of the active
* procedures in the oldest proc wal file so that we can delete it.
* @return all the active procedure ids in this tracker.
*/
public long[] getAllActiveProcIds() {
return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0)
.flatMapToLong(LongStream::of).toArray();
}
/**
* Clears the list of updated procedure ids. This doesn't affect global list of active
* procedure ids.

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
@ -98,7 +99,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* with the tracker of every newer wal files, using the
* {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out
* that all the modified procedures for the oldest wal file are modified or deleted in newer wal
* files, then we can delete it.
* files, then we can delete it. This is because that, every time we call
* {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will
* persist the full state of a Procedure, so the earlier wal records for this procedure can all be
* deleted.
* @see ProcedureWALPrettyPrinter for printing content of a single WAL.
* @see #main(String[]) to parse a directory of MasterWALProcs.
*/
@ -116,7 +120,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
"hbase.procedure.store.wal.warn.threshold";
private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 64;
private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY =
"hbase.procedure.store.wal.exec.cleanup.on.load";
@ -496,7 +500,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
private void tryCleanupLogsOnLoad() {
// nothing to cleanup.
if (logs.size() <= 1) return;
if (logs.size() <= 1) {
return;
}
// the config says to not cleanup wals on load.
if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
@ -967,7 +973,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@VisibleForTesting
protected void periodicRollForTesting() throws IOException {
void periodicRollForTesting() throws IOException {
lock.lock();
try {
periodicRoll();
@ -977,7 +983,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@VisibleForTesting
protected boolean rollWriterForTesting() throws IOException {
boolean rollWriterForTesting() throws IOException {
lock.lock();
try {
return rollWriter();
@ -987,7 +993,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@VisibleForTesting
protected void removeInactiveLogsForTesting() throws Exception {
void removeInactiveLogsForTesting() throws Exception {
lock.lock();
try {
removeInactiveLogs();
@ -1041,7 +1047,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
@VisibleForTesting
boolean rollWriter(final long logId) throws IOException {
boolean rollWriter(long logId) throws IOException {
assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
@ -1059,10 +1065,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
} catch (FileAlreadyExistsException e) {
LOG.error("Log file with id=" + logId + " already exists", e);
LOG.error("Log file with id={} already exists", logId, e);
return false;
} catch (RemoteException re) {
LOG.warn("failed to create log file with id=" + logId, re);
LOG.warn("failed to create log file with id={}", logId, re);
return false;
}
// After we create the stream but before we attempt to use it at all
@ -1099,9 +1105,19 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (logs.size() == 2) {
buildHoldingCleanupTracker();
} else if (logs.size() > walCountWarnThreshold) {
LOG.warn("procedure WALs count=" + logs.size() +
" above the warning threshold " + walCountWarnThreshold +
". check running procedures to see if something is stuck.");
LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" +
" to see if something is stuck.", logs.size(), walCountWarnThreshold);
// This is just like what we have done at RS side when there are too many wal files. For RS,
// if there are too many wal files, we will find out the wal entries in the oldest file, and
// tell the upper layer to flush these regions so the wal entries will be useless and then we
// can delete the wal file. For WALProcedureStore, the assumption is that, if all the
// procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then
// we are safe to delete it. So here if there are too many proc wal files, we will find out
// the procedure ids in the oldest file, which are neither modified nor deleted in newer proc
// wal files, and tell upper layer to update the state of these procedures to the newest proc
// wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal
// file.
sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());
}
LOG.info("Rolled new Procedure Store WAL, id={}", logId);

View File

@ -217,54 +217,8 @@ public class TestProcedureStoreTracker {
}
}
boolean isDeleted(ProcedureStoreTracker n, long procId) {
return n.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
}
boolean isDeleted(BitSetNode n, long procId) {
return n.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
}
/**
* @param active list of active proc ids. To mark them as non-deleted, since by default a proc
* id is always marked deleted.
*/
ProcedureStoreTracker buildTracker(long[] active, long[] updated, long[] deleted) {
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
for (long i : active) {
tracker.insert(i);
}
tracker.resetModified();
for (long i : updated) {
tracker.update(i);
}
for (long i : deleted) {
tracker.delete(i);
}
return tracker;
}
/**
* @param active list of active proc ids. To mark them as non-deleted, since by default a proc
* id is always marked deleted.
*/
BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) {
BitSetNode bitSetNode = new BitSetNode(0L, false);
for (long i : active) {
bitSetNode.insertOrUpdate(i);
}
bitSetNode.resetModified();
for (long i : updated) {
bitSetNode.insertOrUpdate(i);
}
for (long i : deleted) {
bitSetNode.delete(i);
}
return bitSetNode;
}
@Test
public void testSetDeletedIfSet() {
public void testSetDeletedIfModified() {
final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
final long[] procIds = new long[] { 1, 3, 7, 152, 512, 1024, 1025 };
@ -291,4 +245,20 @@ public class TestProcedureStoreTracker {
tracker.setDeletedIfModified(procIds);
assertEquals(true, tracker.isEmpty());
}
@Test
public void testGetActiveProcIds() {
ProcedureStoreTracker tracker = new ProcedureStoreTracker();
for (int i = 0; i < 10000; i++) {
tracker.insert(i * 10);
}
for (int i = 0; i < 10000; i += 2) {
tracker.delete(i * 10);
}
long[] activeProcIds = tracker.getAllActiveProcIds();
assertEquals(5000, activeProcIds.length);
for (int i = 0; i < 5000; i++) {
assertEquals((2 * i + 1) * 10, activeProcIds[i]);
}
}
}

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Exchanger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
@Category({ MasterTests.class, SmallTests.class })
public class TestForceUpdateProcedure {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestForceUpdateProcedure.class);
private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static WALProcedureStore STORE;
private static ProcedureExecutor<Void> EXEC;
private static Exchanger<Boolean> EXCHANGER = new Exchanger<>();
private static int WAL_COUNT = 5;
private static void createStoreAndExecutor() throws IOException {
Path logDir = UTIL.getDataTestDir("proc-wals");
STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
STORE.start(1);
EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true);
}
@BeforeClass
public static void setUp() throws IOException {
UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
createStoreAndExecutor();
}
private static void stopStoreAndExecutor() {
EXEC.stop();
STORE.stop(false);
EXEC = null;
STORE = null;
}
@AfterClass
public static void tearDown() throws IOException {
stopStoreAndExecutor();
UTIL.cleanupTestDir();
}
public static final class WaitingProcedure extends Procedure<Void> {
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
EXCHANGER.exchange(Boolean.TRUE);
setState(ProcedureState.WAITING_TIMEOUT);
setTimeout(Integer.MAX_VALUE);
throw new ProcedureSuspendedException();
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static final class ParentProcedure extends Procedure<Void> {
@SuppressWarnings("unchecked")
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return new Procedure[] { new WaitingProcedure() };
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static final class ExchangeProcedure extends Procedure<Void> {
@SuppressWarnings("unchecked")
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (EXCHANGER.exchange(Boolean.TRUE)) {
return new Procedure[] { this };
} else {
return null;
}
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
@Test
public void test() throws IOException, InterruptedException {
EXEC.submitProcedure(new ParentProcedure());
EXCHANGER.exchange(Boolean.TRUE);
UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
// The above operations are used to make sure that we have persist the states of the two
// procedures.
long procId = EXEC.submitProcedure(new ExchangeProcedure());
assertEquals(1, STORE.getActiveLogs().size());
for (int i = 0; i < WAL_COUNT - 1; i++) {
assertTrue(STORE.rollWriterForTesting());
// The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the
// number of wal files will increase
assertEquals(2 + i, STORE.getActiveLogs().size());
EXCHANGER.exchange(Boolean.TRUE);
Thread.sleep(1000);
}
STORE.rollWriterForTesting();
// Finish the ExchangeProcedure
EXCHANGER.exchange(Boolean.FALSE);
// Make sure that we can delete several wal files because we force update the state of
// WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling
// the newest wal file does not have anything in it, and in the closed file we still have the
// state for the ExchangeProcedure so it can not be deleted
UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2);
UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
// Make sure that after the force update we could still load the procedures
stopStoreAndExecutor();
createStoreAndExecutor();
Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
assertEquals(2, procMap.size());
ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
assertEquals(ProcedureState.WAITING, parentProc.getState());
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
}
}

View File

@ -161,6 +161,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
@ -1480,7 +1481,13 @@ public class HMaster extends HRegionServer implements MasterServices {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore =
new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
procedureStore.registerListener(new ProcedureStoreListener() {
@Override
public void abortProcess() {
abort("The Procedure Store lost the lease", null);
}
});
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
configurationManager.registerObserver(procEnv);

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@ -73,26 +72,6 @@ public class MasterProcedureEnv implements ConfigurationObserver {
}
}
@InterfaceAudience.Private
public static class MasterProcedureStoreListener
implements ProcedureStore.ProcedureStoreListener {
private final MasterServices master;
public MasterProcedureStoreListener(final MasterServices master) {
this.master = master;
}
@Override
public void postSync() {
// no-op
}
@Override
public void abortProcess() {
master.abort("The Procedure Store lost the lease", null);
}
}
private final RSProcedureDispatcher remoteDispatcher;
private final MasterProcedureScheduler procSched;
private final MasterServices master;

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.zookeeper.KeeperException;
@ -220,7 +221,13 @@ public class MockMasterServices extends MockNoopMasterServices {
throws IOException {
final Configuration conf = getConfiguration();
this.procedureStore = new NoopProcedureStore();
this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
this.procedureStore.registerListener(new ProcedureStoreListener() {
@Override
public void abortProcess() {
abort("The Procedure Store lost the lease", null);
}
});
this.procedureEnv = new MasterProcedureEnv(this,
remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));