HBASE-21351 The force update thread may have race with PE worker when the procedure is rolling back

This commit is contained in:
zhangduo 2018-11-02 19:56:16 +08:00
parent 25c964e9a3
commit 62fe365934
7 changed files with 459 additions and 341 deletions

View File

@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@ -42,20 +44,24 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class IdLock {
private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
/** An entry returned to the client as a lock object */
public static final class Entry {
private final long id;
private int numWaiters;
private boolean locked = true;
private Thread holder;
private Entry(long id) {
private Entry(long id, Thread holder) {
this.id = id;
this.holder = holder;
}
@Override
public String toString() {
return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
+ locked;
+ locked + ", holder=" + holder;
}
}
@ -70,7 +76,8 @@ public class IdLock {
* @throws IOException if interrupted
*/
public Entry getLockEntry(long id) throws IOException {
Entry entry = new Entry(id);
Thread currentThread = Thread.currentThread();
Entry entry = new Entry(id, currentThread);
Entry existing;
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
synchronized (existing) {
@ -99,6 +106,7 @@ public class IdLock {
--existing.numWaiters; // Remove ourselves from waiters.
existing.locked = true;
existing.holder = currentThread;
return existing;
}
// If the entry is not locked, it might already be deleted from the
@ -120,7 +128,8 @@ public class IdLock {
*/
public Entry tryLockEntry(long id, long time) throws IOException {
Preconditions.checkArgument(time >= 0);
Entry entry = new Entry(id);
Thread currentThread = Thread.currentThread();
Entry entry = new Entry(id, currentThread);
Entry existing;
long waitUtilTS = System.currentTimeMillis() + time;
long remaining = time;
@ -158,6 +167,7 @@ public class IdLock {
--existing.numWaiters; // Remove ourselves from waiters.
}
existing.locked = true;
existing.holder = currentThread;
return existing;
}
// If the entry is not locked, it might already be deleted from the
@ -169,14 +179,17 @@ public class IdLock {
}
/**
* Must be called in a finally block to decrease the internal counter and
* remove the monitor object for the given id if the caller is the last
* client.
*
* Must be called in a finally block to decrease the internal counter and remove the monitor
* object for the given id if the caller is the last client.
* @param entry the return value of {@link #getLockEntry(long)}
*/
public void releaseLockEntry(Entry entry) {
Thread currentThread = Thread.currentThread();
synchronized (entry) {
if (entry.holder != currentThread) {
LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
entry);
}
entry.locked = false;
if (entry.numWaiters > 0) {
entry.notify();
@ -186,7 +199,21 @@ public class IdLock {
}
}
/** For testing */
/**
* Test whether the given id is already locked by the current thread.
*/
public boolean isHeldByCurrentThread(long id) {
Thread currentThread = Thread.currentThread();
Entry entry = map.get(id);
if (entry == null) {
return false;
}
synchronized (entry) {
return currentThread.equals(entry.holder);
}
}
@VisibleForTesting
void assertMapEmpty() {
assert map.isEmpty();
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Internal cleaner that removes the completed procedure results after a TTL.
* <p/>
* NOTE: This is a special case handled in timeoutLoop().
* <p/>
* Since the client code looks more or less like:
*
* <pre>
* procId = master.doOperation()
* while (master.getProcResult(procId) == ProcInProgress);
* </pre>
*
* The master should not throw away the proc result as soon as the procedure is done but should wait
* a result request from the client (see executor.removeResult(procId)) The client will call
* something like master.isProcDone() or master.getProcResult() which will return the result/state
* to the client, and it will mark the completed proc as ready to delete. note that the client may
* not receive the response from the master (e.g. master failover) so, if we delay a bit the real
* deletion of the proc result the client will be able to get the result the next try.
*/
@InterfaceAudience.Private
class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
private static final int DEFAULT_BATCH_SIZE = 32;
private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
private final IdLock procExecutionLock;
private Configuration conf;
public CompletedProcedureCleaner(Configuration conf, ProcedureStore store,
IdLock procExecutionLock, Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
Map<NonceKey, Long> nonceKeysToProcIdsMap) {
// set the timeout interval that triggers the periodic-procedure
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
this.completed = completedMap;
this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
this.store = store;
this.procExecutionLock = procExecutionLock;
this.conf = conf;
}
@Override
protected void periodicExecute(final TEnvironment env) {
if (completed.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("No completed procedures to cleanup.");
}
return;
}
final long evictTtl =
conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY,
ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
final long[] batchIds = new long[batchSize];
int batchCount = 0;
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
completed.entrySet().iterator();
while (it.hasNext() && store.isRunning()) {
final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
final Procedure<?> proc = retainer.getProcedure();
IdLock.Entry lockEntry;
try {
lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
} catch (IOException e) {
// can only happen if interrupted, so not a big deal to propagate it
throw new UncheckedIOException(e);
}
try {
// TODO: Select TTL based on Procedure type
if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
// Failed procedures aren't persisted in WAL.
if (!(proc instanceof FailedProcedure)) {
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
store.delete(batchIds, 0, batchCount);
batchCount = 0;
}
}
final NonceKey nonceKey = proc.getNonceKey();
if (nonceKey != null) {
nonceKeysToProcIdsMap.remove(nonceKey);
}
it.remove();
LOG.trace("Evict completed {}", proc);
}
} finally {
procExecutionLock.releaseLockEntry(lockEntry);
}
}
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Hold the reference to a completed root procedure. Will be cleaned up after expired.
*/
@InterfaceAudience.Private
class CompletedProcedureRetainer<TEnvironment> {
private final Procedure<TEnvironment> procedure;
private long clientAckTime;
public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
this.procedure = procedure;
clientAckTime = -1;
}
public Procedure<TEnvironment> getProcedure() {
return procedure;
}
public boolean hasClientAckTime() {
return clientAckTime != -1;
}
public long getClientAckTime() {
return clientAckTime;
}
public void setClientAckTime(long clientAckTime) {
this.clientAckTime = clientAckTime;
}
public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
(now - procedure.getLastUpdate()) >= evictTtl;
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
@InterfaceAudience.Private
class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
private String procName;
public FailedProcedure() {
}
public FailedProcedure(long procId, String procName, User owner, NonceKey nonceKey,
IOException exception) {
this.procName = procName;
setProcId(procId);
setState(ProcedureState.ROLLEDBACK);
setOwner(owner);
setNonceKey(nonceKey);
long currentTime = EnvironmentEdgeManager.currentTime();
setSubmittedTime(currentTime);
setLastUpdate(currentTime);
setFailure(Objects.toString(exception.getMessage(), ""), exception);
}
@Override
public String getProcName() {
return procName;
}
@Override
protected Procedure<TEnvironment>[] execute(TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected void rollback(TEnvironment env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(TEnvironment env) {
throw new UnsupportedOperationException();
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}

View File

@ -18,16 +18,14 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -87,6 +85,12 @@ public class ProcedureExecutor<TEnvironment> {
"hbase.procedure.worker.keep.alive.time.msec";
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
/**
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
@ -155,134 +159,6 @@ public class ProcedureExecutor<TEnvironment> {
void procedureFinished(long procId);
}
private static final class CompletedProcedureRetainer<TEnvironment> {
private final Procedure<TEnvironment> procedure;
private long clientAckTime;
public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
this.procedure = procedure;
clientAckTime = -1;
}
public Procedure<TEnvironment> getProcedure() {
return procedure;
}
public boolean hasClientAckTime() {
return clientAckTime != -1;
}
public long getClientAckTime() {
return clientAckTime;
}
public void setClientAckTime(long clientAckTime) {
this.clientAckTime = clientAckTime;
}
public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
(now - procedure.getLastUpdate()) >= evictTtl;
}
}
/**
* Internal cleaner that removes the completed procedure results after a TTL.
* NOTE: This is a special case handled in timeoutLoop().
*
* <p>Since the client code looks more or less like:
* <pre>
* procId = master.doOperation()
* while (master.getProcResult(procId) == ProcInProgress);
* </pre>
* The master should not throw away the proc result as soon as the procedure is done
* but should wait a result request from the client (see executor.removeResult(procId))
* The client will call something like master.isProcDone() or master.getProcResult()
* which will return the result/state to the client, and it will mark the completed
* proc as ready to delete. note that the client may not receive the response from
* the master (e.g. master failover) so, if we delay a bit the real deletion of
* the proc result the client will be able to get the result the next try.
*/
private static class CompletedProcedureCleaner<TEnvironment>
extends ProcedureInMemoryChore<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
private static final int DEFAULT_BATCH_SIZE = 32;
private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
private Configuration conf;
public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
// set the timeout interval that triggers the periodic-procedure
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
this.completed = completedMap;
this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
this.store = store;
this.conf = conf;
}
@Override
protected void periodicExecute(final TEnvironment env) {
if (completed.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("No completed procedures to cleanup.");
}
return;
}
final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
final long[] batchIds = new long[batchSize];
int batchCount = 0;
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
completed.entrySet().iterator();
while (it.hasNext() && store.isRunning()) {
final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
final Procedure<?> proc = retainer.getProcedure();
// TODO: Select TTL based on Procedure type
if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
// Failed procedures aren't persisted in WAL.
if (!(proc instanceof FailedProcedure)) {
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
store.delete(batchIds, 0, batchCount);
batchCount = 0;
}
}
final NonceKey nonceKey = proc.getNonceKey();
if (nonceKey != null) {
nonceKeysToProcIdsMap.remove(nonceKey);
}
it.remove();
LOG.trace("Evict completed {}", proc);
}
}
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
}
}
/**
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
@ -385,15 +261,26 @@ public class ProcedureExecutor<TEnvironment> {
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
try {
Procedure<TEnvironment> proc = procedures.get(procId);
if (proc == null) {
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
return;
}
// For a sub procedure which root parent has not been finished, we still need to retain the
// wal even if the procedure itself is finished.
if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
return;
if (proc != null) {
if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
LOG.debug("Procedure {} has already been finished and parent is succeeded," +
" skip force updating", proc);
return;
}
} else {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
return;
}
long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
LOG.debug("Procedure {} has already been finished and expired, skip force updating",
procId);
return;
}
proc = retainer.getProcedure();
}
LOG.debug("Force update procedure {}", proc);
store.update(proc);
@ -731,7 +618,8 @@ public class ProcedureExecutor<TEnvironment> {
timeoutExecutor.add(new WorkerMonitor());
// Add completed cleaner chore
addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
nonceKeysToProcIdsMap));
}
public void stop() {
@ -918,59 +806,6 @@ public class ProcedureExecutor<TEnvironment> {
}
}
public static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
private String procName;
public FailedProcedure() {
}
public FailedProcedure(long procId, String procName, User owner,
NonceKey nonceKey, IOException exception) {
this.procName = procName;
setProcId(procId);
setState(ProcedureState.ROLLEDBACK);
setOwner(owner);
setNonceKey(nonceKey);
long currentTime = EnvironmentEdgeManager.currentTime();
setSubmittedTime(currentTime);
setLastUpdate(currentTime);
setFailure(Objects.toString(exception.getMessage(), ""), exception);
}
@Override
public String getProcName() {
return procName;
}
@Override
protected Procedure<TEnvironment>[] execute(TEnvironment env)
throws ProcedureYieldException, ProcedureSuspendedException,
InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected void rollback(TEnvironment env)
throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(TEnvironment env) {
throw new UnsupportedOperationException();
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
}
}
/**
* If the failure failed before submitting it, we may want to give back the
* same error to the requests with the same nonceKey.
@ -1616,53 +1451,74 @@ public class ProcedureExecutor<TEnvironment> {
int stackTail = subprocStack.size();
while (stackTail-- > 0) {
Procedure<TEnvironment> proc = subprocStack.get(stackTail);
// For the sub procedures which are successfully finished, we do not rollback them.
// Typically, if we want to rollback a procedure, we first need to rollback it, and then
// recursively rollback its ancestors. The state changes which are done by sub procedures
// should be handled by parent procedures when rolling back. For example, when rolling back a
// MergeTableProcedure, we will schedule new procedures to bring the offline regions online,
// instead of rolling back the original procedures which offlined the regions(in fact these
// procedures can not be rolled back...).
if (proc.isSuccess()) {
// Just do the cleanup work, without actually executing the rollback
IdLock.Entry lockEntry = null;
// Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
// this check, as the worker will hold the lock before executing a procedure. This is the only
// place where we may hold two procedure execution locks, and there is a fence in the
// RootProcedureState where we can make sure that only one worker can execute the rollback of
// a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
// prevent race between us and the force update thread.
if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
try {
lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
} catch (IOException e) {
// can only happen if interrupted, so not a big deal to propagate it
throw new UncheckedIOException(e);
}
}
try {
// For the sub procedures which are successfully finished, we do not rollback them.
// Typically, if we want to rollback a procedure, we first need to rollback it, and then
// recursively rollback its ancestors. The state changes which are done by sub procedures
// should be handled by parent procedures when rolling back. For example, when rolling back
// a MergeTableProcedure, we will schedule new procedures to bring the offline regions
// online, instead of rolling back the original procedures which offlined the regions(in
// fact these procedures can not be rolled back...).
if (proc.isSuccess()) {
// Just do the cleanup work, without actually executing the rollback
subprocStack.remove(stackTail);
cleanupAfterRollbackOneStep(proc);
continue;
}
LockState lockState = acquireLock(proc);
if (lockState != LockState.LOCK_ACQUIRED) {
// can't take a lock on the procedure, add the root-proc back on the
// queue waiting for the lock availability
return lockState;
}
lockState = executeRollback(proc);
releaseLock(proc, false);
boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (abortRollback) {
return lockState;
}
subprocStack.remove(stackTail);
cleanupAfterRollbackOneStep(proc);
continue;
}
LockState lockState = acquireLock(proc);
if (lockState != LockState.LOCK_ACQUIRED) {
// can't take a lock on the procedure, add the root-proc back on the
// queue waiting for the lock availability
return lockState;
}
lockState = executeRollback(proc);
releaseLock(proc, false);
boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
// if the procedure is kind enough to pass the slot to someone else, yield
// if the proc is already finished, do not yield
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
return LockState.LOCK_YIELD_WAIT;
}
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (abortRollback) {
return lockState;
}
subprocStack.remove(stackTail);
// if the procedure is kind enough to pass the slot to someone else, yield
// if the proc is already finished, do not yield
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
return LockState.LOCK_YIELD_WAIT;
}
if (proc != rootProc) {
execCompletionCleanup(proc);
if (proc != rootProc) {
execCompletionCleanup(proc);
}
} finally {
if (lockEntry != null) {
procExecutionLock.releaseLockEntry(lockEntry);
}
}
}
// Finalize the procedure state
LOG.info("Rolled back " + rootProc +
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()));
LOG.info("Rolled back {} exec-time={}", rootProc,
StringUtils.humanTimeDiff(rootProc.elapsedTime()));
procedureFinished(rootProc);
return LockState.LOCK_ACQUIRED;
}
@ -2046,6 +1902,11 @@ public class ProcedureExecutor<TEnvironment> {
return scheduler;
}
@VisibleForTesting
int getCompletedSize() {
return completed.size();
}
// ==========================================================================
// Worker Thread
// ==========================================================================

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -27,19 +27,19 @@ import java.util.concurrent.Exchanger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
@ -60,8 +60,12 @@ public class TestForceUpdateProcedure {
private static int WAL_COUNT = 5;
private static void createStoreAndExecutor() throws IOException {
Path logDir = UTIL.getDataTestDir("proc-wals");
@Rule
public final TestName name = new TestName();
private void createStoreAndExecutor() throws IOException {
UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000);
Path logDir = UTIL.getDataTestDir(name.getMethodName());
STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
STORE.start(1);
EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
@ -69,12 +73,11 @@ public class TestForceUpdateProcedure {
}
@BeforeClass
public static void setUp() throws IOException {
public static void setUpBeforeClass() throws IOException {
UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
createStoreAndExecutor();
}
private static void stopStoreAndExecutor() {
private void stopStoreAndExecutor() {
EXEC.stop();
STORE.stop(false);
EXEC = null;
@ -82,12 +85,21 @@ public class TestForceUpdateProcedure {
}
@AfterClass
public static void tearDown() throws IOException {
stopStoreAndExecutor();
public static void tearDownAfterClass() throws IOException {
UTIL.cleanupTestDir();
}
public static final class WaitingProcedure extends Procedure<Void> {
@Before
public void setUp() throws IOException {
createStoreAndExecutor();
}
@After
public void tearDown() {
stopStoreAndExecutor();
}
public static final class WaitingProcedure extends NoopProcedure<Void> {
@Override
protected Procedure<Void>[] execute(Void env)
@ -97,82 +109,19 @@ public class TestForceUpdateProcedure {
setTimeout(Integer.MAX_VALUE);
throw new ProcedureSuspendedException();
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static final class ParentProcedure extends Procedure<Void> {
public static final class ParentProcedure extends NoopProcedure<Void> {
@SuppressWarnings("unchecked")
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() };
}
}
public static final class DummyProcedure extends Procedure<Void> {
@Override
protected Procedure<Void>[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return null;
}
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static final class ExchangeProcedure extends Procedure<Void> {
public static final class ExchangeProcedure extends NoopProcedure<Void> {
@SuppressWarnings("unchecked")
@Override
@ -184,28 +133,18 @@ public class TestForceUpdateProcedure {
return null;
}
}
}
public static final class NoopNoAckProcedure extends NoopProcedure<Void> {
@Override
protected void rollback(Void env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(Void env) {
protected boolean shouldWaitClientAck(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
@Test
public void test() throws IOException, InterruptedException {
public void testProcedureStuck() throws IOException, InterruptedException {
EXEC.submitProcedure(new ParentProcedure());
EXCHANGER.exchange(Boolean.TRUE);
UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
@ -240,7 +179,26 @@ public class TestForceUpdateProcedure {
assertEquals(ProcedureState.WAITING, parentProc.getState());
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class);
assertEquals(ProcedureState.SUCCESS, noopProc.getState());
}
@Test
public void testCompletedProcedure() throws InterruptedException, IOException {
long procId = EXEC.submitProcedure(new ExchangeProcedure());
EXCHANGER.exchange(Boolean.FALSE);
UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
for (int i = 0; i < WAL_COUNT - 1; i++) {
assertTrue(STORE.rollWriterForTesting());
// The exchange procedure is completed but still not deleted yet so we can not delete the
// oldest wal file
long pid = EXEC.submitProcedure(new NoopNoAckProcedure());
assertEquals(2 + i, STORE.getActiveLogs().size());
UTIL.waitFor(10000, () -> EXEC.isFinished(pid));
}
// Only the exchange procedure can not be deleted
UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1);
STORE.rollWriterForTesting();
UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1);
}
}

View File

@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running).
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true.
* For triggering pre commit
*/
@InterfaceAudience.Public
@SuppressWarnings("deprecation")