HBASE-21351 The force update thread may have race with PE worker when the procedure is rolling back
This commit is contained in:
parent
1bd98bb608
commit
4f22397ad4
|
@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
@ -42,20 +44,24 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class IdLock {
|
public class IdLock {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
|
||||||
|
|
||||||
/** An entry returned to the client as a lock object */
|
/** An entry returned to the client as a lock object */
|
||||||
public static final class Entry {
|
public static final class Entry {
|
||||||
private final long id;
|
private final long id;
|
||||||
private int numWaiters;
|
private int numWaiters;
|
||||||
private boolean locked = true;
|
private boolean locked = true;
|
||||||
|
private Thread holder;
|
||||||
|
|
||||||
private Entry(long id) {
|
private Entry(long id, Thread holder) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
|
this.holder = holder;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
|
return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
|
||||||
+ locked;
|
+ locked + ", holder=" + holder;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +76,8 @@ public class IdLock {
|
||||||
* @throws IOException if interrupted
|
* @throws IOException if interrupted
|
||||||
*/
|
*/
|
||||||
public Entry getLockEntry(long id) throws IOException {
|
public Entry getLockEntry(long id) throws IOException {
|
||||||
Entry entry = new Entry(id);
|
Thread currentThread = Thread.currentThread();
|
||||||
|
Entry entry = new Entry(id, currentThread);
|
||||||
Entry existing;
|
Entry existing;
|
||||||
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
|
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
|
||||||
synchronized (existing) {
|
synchronized (existing) {
|
||||||
|
@ -99,6 +106,7 @@ public class IdLock {
|
||||||
|
|
||||||
--existing.numWaiters; // Remove ourselves from waiters.
|
--existing.numWaiters; // Remove ourselves from waiters.
|
||||||
existing.locked = true;
|
existing.locked = true;
|
||||||
|
existing.holder = currentThread;
|
||||||
return existing;
|
return existing;
|
||||||
}
|
}
|
||||||
// If the entry is not locked, it might already be deleted from the
|
// If the entry is not locked, it might already be deleted from the
|
||||||
|
@ -120,7 +128,8 @@ public class IdLock {
|
||||||
*/
|
*/
|
||||||
public Entry tryLockEntry(long id, long time) throws IOException {
|
public Entry tryLockEntry(long id, long time) throws IOException {
|
||||||
Preconditions.checkArgument(time >= 0);
|
Preconditions.checkArgument(time >= 0);
|
||||||
Entry entry = new Entry(id);
|
Thread currentThread = Thread.currentThread();
|
||||||
|
Entry entry = new Entry(id, currentThread);
|
||||||
Entry existing;
|
Entry existing;
|
||||||
long waitUtilTS = System.currentTimeMillis() + time;
|
long waitUtilTS = System.currentTimeMillis() + time;
|
||||||
long remaining = time;
|
long remaining = time;
|
||||||
|
@ -158,6 +167,7 @@ public class IdLock {
|
||||||
--existing.numWaiters; // Remove ourselves from waiters.
|
--existing.numWaiters; // Remove ourselves from waiters.
|
||||||
}
|
}
|
||||||
existing.locked = true;
|
existing.locked = true;
|
||||||
|
existing.holder = currentThread;
|
||||||
return existing;
|
return existing;
|
||||||
}
|
}
|
||||||
// If the entry is not locked, it might already be deleted from the
|
// If the entry is not locked, it might already be deleted from the
|
||||||
|
@ -169,14 +179,17 @@ public class IdLock {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Must be called in a finally block to decrease the internal counter and
|
* Must be called in a finally block to decrease the internal counter and remove the monitor
|
||||||
* remove the monitor object for the given id if the caller is the last
|
* object for the given id if the caller is the last client.
|
||||||
* client.
|
|
||||||
*
|
|
||||||
* @param entry the return value of {@link #getLockEntry(long)}
|
* @param entry the return value of {@link #getLockEntry(long)}
|
||||||
*/
|
*/
|
||||||
public void releaseLockEntry(Entry entry) {
|
public void releaseLockEntry(Entry entry) {
|
||||||
|
Thread currentThread = Thread.currentThread();
|
||||||
synchronized (entry) {
|
synchronized (entry) {
|
||||||
|
if (entry.holder != currentThread) {
|
||||||
|
LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
|
||||||
|
entry);
|
||||||
|
}
|
||||||
entry.locked = false;
|
entry.locked = false;
|
||||||
if (entry.numWaiters > 0) {
|
if (entry.numWaiters > 0) {
|
||||||
entry.notify();
|
entry.notify();
|
||||||
|
@ -186,7 +199,21 @@ public class IdLock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** For testing */
|
/**
|
||||||
|
* Test whether the given id is already locked by the current thread.
|
||||||
|
*/
|
||||||
|
public boolean isHeldByCurrentThread(long id) {
|
||||||
|
Thread currentThread = Thread.currentThread();
|
||||||
|
Entry entry = map.get(id);
|
||||||
|
if (entry == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
synchronized (entry) {
|
||||||
|
return currentThread.equals(entry.holder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
void assertMapEmpty() {
|
void assertMapEmpty() {
|
||||||
assert map.isEmpty();
|
assert map.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.IdLock;
|
||||||
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal cleaner that removes the completed procedure results after a TTL.
|
||||||
|
* <p/>
|
||||||
|
* NOTE: This is a special case handled in timeoutLoop().
|
||||||
|
* <p/>
|
||||||
|
* Since the client code looks more or less like:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* procId = master.doOperation()
|
||||||
|
* while (master.getProcResult(procId) == ProcInProgress);
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* The master should not throw away the proc result as soon as the procedure is done but should wait
|
||||||
|
* a result request from the client (see executor.removeResult(procId)) The client will call
|
||||||
|
* something like master.isProcDone() or master.getProcResult() which will return the result/state
|
||||||
|
* to the client, and it will mark the completed proc as ready to delete. note that the client may
|
||||||
|
* not receive the response from the master (e.g. master failover) so, if we delay a bit the real
|
||||||
|
* deletion of the proc result the client will be able to get the result the next try.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
|
||||||
|
|
||||||
|
static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
|
||||||
|
private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
|
||||||
|
|
||||||
|
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
|
||||||
|
private static final int DEFAULT_BATCH_SIZE = 32;
|
||||||
|
|
||||||
|
private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
|
||||||
|
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
|
||||||
|
private final ProcedureStore store;
|
||||||
|
private final IdLock procExecutionLock;
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
public CompletedProcedureCleaner(Configuration conf, ProcedureStore store,
|
||||||
|
IdLock procExecutionLock, Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
|
||||||
|
Map<NonceKey, Long> nonceKeysToProcIdsMap) {
|
||||||
|
// set the timeout interval that triggers the periodic-procedure
|
||||||
|
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
|
||||||
|
this.completed = completedMap;
|
||||||
|
this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
|
||||||
|
this.store = store;
|
||||||
|
this.procExecutionLock = procExecutionLock;
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void periodicExecute(final TEnvironment env) {
|
||||||
|
if (completed.isEmpty()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("No completed procedures to cleanup.");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long evictTtl =
|
||||||
|
conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
|
||||||
|
final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY,
|
||||||
|
ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
|
||||||
|
final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
|
||||||
|
|
||||||
|
final long[] batchIds = new long[batchSize];
|
||||||
|
int batchCount = 0;
|
||||||
|
|
||||||
|
final long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
|
||||||
|
completed.entrySet().iterator();
|
||||||
|
while (it.hasNext() && store.isRunning()) {
|
||||||
|
final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
|
||||||
|
final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
|
||||||
|
final Procedure<?> proc = retainer.getProcedure();
|
||||||
|
IdLock.Entry lockEntry;
|
||||||
|
try {
|
||||||
|
lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
// can only happen if interrupted, so not a big deal to propagate it
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// TODO: Select TTL based on Procedure type
|
||||||
|
if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
|
||||||
|
// Failed procedures aren't persisted in WAL.
|
||||||
|
if (!(proc instanceof FailedProcedure)) {
|
||||||
|
batchIds[batchCount++] = entry.getKey();
|
||||||
|
if (batchCount == batchIds.length) {
|
||||||
|
store.delete(batchIds, 0, batchCount);
|
||||||
|
batchCount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final NonceKey nonceKey = proc.getNonceKey();
|
||||||
|
if (nonceKey != null) {
|
||||||
|
nonceKeysToProcIdsMap.remove(nonceKey);
|
||||||
|
}
|
||||||
|
it.remove();
|
||||||
|
LOG.trace("Evict completed {}", proc);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
procExecutionLock.releaseLockEntry(lockEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (batchCount > 0) {
|
||||||
|
store.delete(batchIds, 0, batchCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hold the reference to a completed root procedure. Will be cleaned up after expired.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class CompletedProcedureRetainer<TEnvironment> {
|
||||||
|
private final Procedure<TEnvironment> procedure;
|
||||||
|
private long clientAckTime;
|
||||||
|
|
||||||
|
public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
|
||||||
|
this.procedure = procedure;
|
||||||
|
clientAckTime = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Procedure<TEnvironment> getProcedure() {
|
||||||
|
return procedure;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasClientAckTime() {
|
||||||
|
return clientAckTime != -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getClientAckTime() {
|
||||||
|
return clientAckTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClientAckTime(long clientAckTime) {
|
||||||
|
this.clientAckTime = clientAckTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
|
||||||
|
return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
|
||||||
|
(now - procedure.getLastUpdate()) >= evictTtl;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,78 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Objects;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
|
||||||
|
|
||||||
|
private String procName;
|
||||||
|
|
||||||
|
public FailedProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailedProcedure(long procId, String procName, User owner, NonceKey nonceKey,
|
||||||
|
IOException exception) {
|
||||||
|
this.procName = procName;
|
||||||
|
setProcId(procId);
|
||||||
|
setState(ProcedureState.ROLLEDBACK);
|
||||||
|
setOwner(owner);
|
||||||
|
setNonceKey(nonceKey);
|
||||||
|
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
setSubmittedTime(currentTime);
|
||||||
|
setLastUpdate(currentTime);
|
||||||
|
setFailure(Objects.toString(exception.getMessage(), ""), exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getProcName() {
|
||||||
|
return procName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure<TEnvironment>[] execute(TEnvironment env)
|
||||||
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(TEnvironment env) throws IOException, InterruptedException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(TEnvironment env) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,16 +18,14 @@
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
@ -87,6 +85,12 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
"hbase.procedure.worker.keep.alive.time.msec";
|
"hbase.procedure.worker.keep.alive.time.msec";
|
||||||
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
|
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
|
||||||
|
|
||||||
|
public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
|
||||||
|
static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
|
||||||
|
|
||||||
|
public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
|
||||||
|
static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
|
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
|
||||||
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
|
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
|
||||||
|
@ -155,134 +159,6 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
void procedureFinished(long procId);
|
void procedureFinished(long procId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class CompletedProcedureRetainer<TEnvironment> {
|
|
||||||
private final Procedure<TEnvironment> procedure;
|
|
||||||
private long clientAckTime;
|
|
||||||
|
|
||||||
public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
|
|
||||||
this.procedure = procedure;
|
|
||||||
clientAckTime = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Procedure<TEnvironment> getProcedure() {
|
|
||||||
return procedure;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasClientAckTime() {
|
|
||||||
return clientAckTime != -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getClientAckTime() {
|
|
||||||
return clientAckTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setClientAckTime(long clientAckTime) {
|
|
||||||
this.clientAckTime = clientAckTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
|
|
||||||
return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
|
|
||||||
(now - procedure.getLastUpdate()) >= evictTtl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal cleaner that removes the completed procedure results after a TTL.
|
|
||||||
* NOTE: This is a special case handled in timeoutLoop().
|
|
||||||
*
|
|
||||||
* <p>Since the client code looks more or less like:
|
|
||||||
* <pre>
|
|
||||||
* procId = master.doOperation()
|
|
||||||
* while (master.getProcResult(procId) == ProcInProgress);
|
|
||||||
* </pre>
|
|
||||||
* The master should not throw away the proc result as soon as the procedure is done
|
|
||||||
* but should wait a result request from the client (see executor.removeResult(procId))
|
|
||||||
* The client will call something like master.isProcDone() or master.getProcResult()
|
|
||||||
* which will return the result/state to the client, and it will mark the completed
|
|
||||||
* proc as ready to delete. note that the client may not receive the response from
|
|
||||||
* the master (e.g. master failover) so, if we delay a bit the real deletion of
|
|
||||||
* the proc result the client will be able to get the result the next try.
|
|
||||||
*/
|
|
||||||
private static class CompletedProcedureCleaner<TEnvironment>
|
|
||||||
extends ProcedureInMemoryChore<TEnvironment> {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
|
|
||||||
|
|
||||||
private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
|
|
||||||
private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
|
|
||||||
|
|
||||||
private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
|
|
||||||
private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
|
|
||||||
|
|
||||||
private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
|
|
||||||
private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
|
|
||||||
|
|
||||||
private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
|
|
||||||
private static final int DEFAULT_BATCH_SIZE = 32;
|
|
||||||
|
|
||||||
private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
|
|
||||||
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
|
|
||||||
private final ProcedureStore store;
|
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
|
|
||||||
final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
|
|
||||||
final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
|
|
||||||
// set the timeout interval that triggers the periodic-procedure
|
|
||||||
super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
|
|
||||||
this.completed = completedMap;
|
|
||||||
this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
|
|
||||||
this.store = store;
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void periodicExecute(final TEnvironment env) {
|
|
||||||
if (completed.isEmpty()) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("No completed procedures to cleanup.");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
|
|
||||||
final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
|
|
||||||
final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
|
|
||||||
|
|
||||||
final long[] batchIds = new long[batchSize];
|
|
||||||
int batchCount = 0;
|
|
||||||
|
|
||||||
final long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
|
|
||||||
completed.entrySet().iterator();
|
|
||||||
while (it.hasNext() && store.isRunning()) {
|
|
||||||
final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
|
|
||||||
final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
|
|
||||||
final Procedure<?> proc = retainer.getProcedure();
|
|
||||||
|
|
||||||
// TODO: Select TTL based on Procedure type
|
|
||||||
if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
|
|
||||||
// Failed procedures aren't persisted in WAL.
|
|
||||||
if (!(proc instanceof FailedProcedure)) {
|
|
||||||
batchIds[batchCount++] = entry.getKey();
|
|
||||||
if (batchCount == batchIds.length) {
|
|
||||||
store.delete(batchIds, 0, batchCount);
|
|
||||||
batchCount = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final NonceKey nonceKey = proc.getNonceKey();
|
|
||||||
if (nonceKey != null) {
|
|
||||||
nonceKeysToProcIdsMap.remove(nonceKey);
|
|
||||||
}
|
|
||||||
it.remove();
|
|
||||||
LOG.trace("Evict completed {}", proc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (batchCount > 0) {
|
|
||||||
store.delete(batchIds, 0, batchCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
|
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
|
||||||
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
|
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
|
||||||
|
@ -385,15 +261,26 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
|
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
|
||||||
try {
|
try {
|
||||||
Procedure<TEnvironment> proc = procedures.get(procId);
|
Procedure<TEnvironment> proc = procedures.get(procId);
|
||||||
if (proc == null) {
|
if (proc != null) {
|
||||||
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
|
if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
|
||||||
return;
|
LOG.debug("Procedure {} has already been finished and parent is succeeded," +
|
||||||
}
|
" skip force updating", proc);
|
||||||
// For a sub procedure which root parent has not been finished, we still need to retain the
|
return;
|
||||||
// wal even if the procedure itself is finished.
|
}
|
||||||
if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
|
} else {
|
||||||
LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
|
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
|
||||||
return;
|
if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
|
||||||
|
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
|
||||||
|
long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
|
||||||
|
if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
|
||||||
|
LOG.debug("Procedure {} has already been finished and expired, skip force updating",
|
||||||
|
procId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
proc = retainer.getProcedure();
|
||||||
}
|
}
|
||||||
LOG.debug("Force update procedure {}", proc);
|
LOG.debug("Force update procedure {}", proc);
|
||||||
store.update(proc);
|
store.update(proc);
|
||||||
|
@ -731,7 +618,8 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
timeoutExecutor.add(new WorkerMonitor());
|
timeoutExecutor.add(new WorkerMonitor());
|
||||||
|
|
||||||
// Add completed cleaner chore
|
// Add completed cleaner chore
|
||||||
addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
|
addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
|
||||||
|
nonceKeysToProcIdsMap));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
@ -918,59 +806,6 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FailedProcedure<TEnvironment> extends Procedure<TEnvironment> {
|
|
||||||
private String procName;
|
|
||||||
|
|
||||||
public FailedProcedure() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public FailedProcedure(long procId, String procName, User owner,
|
|
||||||
NonceKey nonceKey, IOException exception) {
|
|
||||||
this.procName = procName;
|
|
||||||
setProcId(procId);
|
|
||||||
setState(ProcedureState.ROLLEDBACK);
|
|
||||||
setOwner(owner);
|
|
||||||
setNonceKey(nonceKey);
|
|
||||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
setSubmittedTime(currentTime);
|
|
||||||
setLastUpdate(currentTime);
|
|
||||||
setFailure(Objects.toString(exception.getMessage(), ""), exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getProcName() {
|
|
||||||
return procName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Procedure<TEnvironment>[] execute(TEnvironment env)
|
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException,
|
|
||||||
InterruptedException {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rollback(TEnvironment env)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean abort(TEnvironment env) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
|
||||||
throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void deserializeStateData(ProcedureStateSerializer serializer)
|
|
||||||
throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the failure failed before submitting it, we may want to give back the
|
* If the failure failed before submitting it, we may want to give back the
|
||||||
* same error to the requests with the same nonceKey.
|
* same error to the requests with the same nonceKey.
|
||||||
|
@ -1616,53 +1451,74 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
int stackTail = subprocStack.size();
|
int stackTail = subprocStack.size();
|
||||||
while (stackTail-- > 0) {
|
while (stackTail-- > 0) {
|
||||||
Procedure<TEnvironment> proc = subprocStack.get(stackTail);
|
Procedure<TEnvironment> proc = subprocStack.get(stackTail);
|
||||||
// For the sub procedures which are successfully finished, we do not rollback them.
|
IdLock.Entry lockEntry = null;
|
||||||
// Typically, if we want to rollback a procedure, we first need to rollback it, and then
|
// Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
|
||||||
// recursively rollback its ancestors. The state changes which are done by sub procedures
|
// this check, as the worker will hold the lock before executing a procedure. This is the only
|
||||||
// should be handled by parent procedures when rolling back. For example, when rolling back a
|
// place where we may hold two procedure execution locks, and there is a fence in the
|
||||||
// MergeTableProcedure, we will schedule new procedures to bring the offline regions online,
|
// RootProcedureState where we can make sure that only one worker can execute the rollback of
|
||||||
// instead of rolling back the original procedures which offlined the regions(in fact these
|
// a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
|
||||||
// procedures can not be rolled back...).
|
// prevent race between us and the force update thread.
|
||||||
if (proc.isSuccess()) {
|
if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
|
||||||
// Just do the cleanup work, without actually executing the rollback
|
try {
|
||||||
|
lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
// can only happen if interrupted, so not a big deal to propagate it
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// For the sub procedures which are successfully finished, we do not rollback them.
|
||||||
|
// Typically, if we want to rollback a procedure, we first need to rollback it, and then
|
||||||
|
// recursively rollback its ancestors. The state changes which are done by sub procedures
|
||||||
|
// should be handled by parent procedures when rolling back. For example, when rolling back
|
||||||
|
// a MergeTableProcedure, we will schedule new procedures to bring the offline regions
|
||||||
|
// online, instead of rolling back the original procedures which offlined the regions(in
|
||||||
|
// fact these procedures can not be rolled back...).
|
||||||
|
if (proc.isSuccess()) {
|
||||||
|
// Just do the cleanup work, without actually executing the rollback
|
||||||
|
subprocStack.remove(stackTail);
|
||||||
|
cleanupAfterRollbackOneStep(proc);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LockState lockState = acquireLock(proc);
|
||||||
|
if (lockState != LockState.LOCK_ACQUIRED) {
|
||||||
|
// can't take a lock on the procedure, add the root-proc back on the
|
||||||
|
// queue waiting for the lock availability
|
||||||
|
return lockState;
|
||||||
|
}
|
||||||
|
|
||||||
|
lockState = executeRollback(proc);
|
||||||
|
releaseLock(proc, false);
|
||||||
|
boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
|
||||||
|
abortRollback |= !isRunning() || !store.isRunning();
|
||||||
|
|
||||||
|
// allows to kill the executor before something is stored to the wal.
|
||||||
|
// useful to test the procedure recovery.
|
||||||
|
if (abortRollback) {
|
||||||
|
return lockState;
|
||||||
|
}
|
||||||
|
|
||||||
subprocStack.remove(stackTail);
|
subprocStack.remove(stackTail);
|
||||||
cleanupAfterRollbackOneStep(proc);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
LockState lockState = acquireLock(proc);
|
|
||||||
if (lockState != LockState.LOCK_ACQUIRED) {
|
|
||||||
// can't take a lock on the procedure, add the root-proc back on the
|
|
||||||
// queue waiting for the lock availability
|
|
||||||
return lockState;
|
|
||||||
}
|
|
||||||
|
|
||||||
lockState = executeRollback(proc);
|
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||||
releaseLock(proc, false);
|
// if the proc is already finished, do not yield
|
||||||
boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
|
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||||
abortRollback |= !isRunning() || !store.isRunning();
|
return LockState.LOCK_YIELD_WAIT;
|
||||||
|
}
|
||||||
|
|
||||||
// allows to kill the executor before something is stored to the wal.
|
if (proc != rootProc) {
|
||||||
// useful to test the procedure recovery.
|
execCompletionCleanup(proc);
|
||||||
if (abortRollback) {
|
}
|
||||||
return lockState;
|
} finally {
|
||||||
}
|
if (lockEntry != null) {
|
||||||
|
procExecutionLock.releaseLockEntry(lockEntry);
|
||||||
subprocStack.remove(stackTail);
|
}
|
||||||
|
|
||||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
|
||||||
// if the proc is already finished, do not yield
|
|
||||||
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
|
|
||||||
return LockState.LOCK_YIELD_WAIT;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (proc != rootProc) {
|
|
||||||
execCompletionCleanup(proc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finalize the procedure state
|
// Finalize the procedure state
|
||||||
LOG.info("Rolled back " + rootProc +
|
LOG.info("Rolled back {} exec-time={}", rootProc,
|
||||||
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()));
|
StringUtils.humanTimeDiff(rootProc.elapsedTime()));
|
||||||
procedureFinished(rootProc);
|
procedureFinished(rootProc);
|
||||||
return LockState.LOCK_ACQUIRED;
|
return LockState.LOCK_ACQUIRED;
|
||||||
}
|
}
|
||||||
|
@ -2047,6 +1903,11 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return scheduler;
|
return scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getCompletedSize() {
|
||||||
|
return completed.size();
|
||||||
|
}
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Worker Thread
|
// Worker Thread
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -27,19 +27,19 @@ import java.util.concurrent.Exchanger;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
|
@ -60,8 +60,12 @@ public class TestForceUpdateProcedure {
|
||||||
|
|
||||||
private static int WAL_COUNT = 5;
|
private static int WAL_COUNT = 5;
|
||||||
|
|
||||||
private static void createStoreAndExecutor() throws IOException {
|
@Rule
|
||||||
Path logDir = UTIL.getDataTestDir("proc-wals");
|
public final TestName name = new TestName();
|
||||||
|
|
||||||
|
private void createStoreAndExecutor() throws IOException {
|
||||||
|
UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000);
|
||||||
|
Path logDir = UTIL.getDataTestDir(name.getMethodName());
|
||||||
STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
|
STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
|
||||||
STORE.start(1);
|
STORE.start(1);
|
||||||
EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
|
EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
|
||||||
|
@ -69,12 +73,11 @@ public class TestForceUpdateProcedure {
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws IOException {
|
public static void setUpBeforeClass() throws IOException {
|
||||||
UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
|
UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
|
||||||
createStoreAndExecutor();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void stopStoreAndExecutor() {
|
private void stopStoreAndExecutor() {
|
||||||
EXEC.stop();
|
EXEC.stop();
|
||||||
STORE.stop(false);
|
STORE.stop(false);
|
||||||
EXEC = null;
|
EXEC = null;
|
||||||
|
@ -82,12 +85,21 @@ public class TestForceUpdateProcedure {
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws IOException {
|
public static void tearDownAfterClass() throws IOException {
|
||||||
stopStoreAndExecutor();
|
|
||||||
UTIL.cleanupTestDir();
|
UTIL.cleanupTestDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class WaitingProcedure extends Procedure<Void> {
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
createStoreAndExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
stopStoreAndExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class WaitingProcedure extends NoopProcedure<Void> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure<Void>[] execute(Void env)
|
protected Procedure<Void>[] execute(Void env)
|
||||||
|
@ -97,82 +109,19 @@ public class TestForceUpdateProcedure {
|
||||||
setTimeout(Integer.MAX_VALUE);
|
setTimeout(Integer.MAX_VALUE);
|
||||||
throw new ProcedureSuspendedException();
|
throw new ProcedureSuspendedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rollback(Void env) throws IOException, InterruptedException {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean abort(Void env) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class ParentProcedure extends Procedure<Void> {
|
public static final class ParentProcedure extends NoopProcedure<Void> {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
protected Procedure<Void>[] execute(Void env)
|
protected Procedure<Void>[] execute(Void env)
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
|
return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() };
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rollback(Void env) throws IOException, InterruptedException {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean abort(Void env) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class DummyProcedure extends Procedure<Void> {
|
public static final class ExchangeProcedure extends NoopProcedure<Void> {
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Procedure<Void>[] execute(Void env)
|
|
||||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rollback(Void env) throws IOException, InterruptedException {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean abort(Void env) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class ExchangeProcedure extends Procedure<Void> {
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
|
@ -184,28 +133,18 @@ public class TestForceUpdateProcedure {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class NoopNoAckProcedure extends NoopProcedure<Void> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void rollback(Void env) throws IOException, InterruptedException {
|
protected boolean shouldWaitClientAck(Void env) {
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean abort(Void env) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws IOException, InterruptedException {
|
public void testProcedureStuck() throws IOException, InterruptedException {
|
||||||
EXEC.submitProcedure(new ParentProcedure());
|
EXEC.submitProcedure(new ParentProcedure());
|
||||||
EXCHANGER.exchange(Boolean.TRUE);
|
EXCHANGER.exchange(Boolean.TRUE);
|
||||||
UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
|
UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
|
||||||
|
@ -240,7 +179,26 @@ public class TestForceUpdateProcedure {
|
||||||
assertEquals(ProcedureState.WAITING, parentProc.getState());
|
assertEquals(ProcedureState.WAITING, parentProc.getState());
|
||||||
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
|
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
|
||||||
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
|
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
|
||||||
DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
|
NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class);
|
||||||
assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
|
assertEquals(ProcedureState.SUCCESS, noopProc.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompletedProcedure() throws InterruptedException, IOException {
|
||||||
|
long procId = EXEC.submitProcedure(new ExchangeProcedure());
|
||||||
|
EXCHANGER.exchange(Boolean.FALSE);
|
||||||
|
UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
|
||||||
|
for (int i = 0; i < WAL_COUNT - 1; i++) {
|
||||||
|
assertTrue(STORE.rollWriterForTesting());
|
||||||
|
// The exchange procedure is completed but still not deleted yet so we can not delete the
|
||||||
|
// oldest wal file
|
||||||
|
long pid = EXEC.submitProcedure(new NoopNoAckProcedure());
|
||||||
|
assertEquals(2 + i, STORE.getActiveLogs().size());
|
||||||
|
UTIL.waitFor(10000, () -> EXEC.isFinished(pid));
|
||||||
|
}
|
||||||
|
// Only the exchange procedure can not be deleted
|
||||||
|
UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1);
|
||||||
|
STORE.rollWriterForTesting();
|
||||||
|
UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
* avoiding port contention if another local HBase instance is already running).
|
* avoiding port contention if another local HBase instance is already running).
|
||||||
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
|
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
|
||||||
* setting it to true.
|
* setting it to true.
|
||||||
|
* For triggering pre commit
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
|
Loading…
Reference in New Issue