diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
index c4adfbff154..9e5692feebb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -42,20 +44,24 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class IdLock {
+ private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
+
/** An entry returned to the client as a lock object */
public static final class Entry {
private final long id;
private int numWaiters;
private boolean locked = true;
+ private Thread holder;
- private Entry(long id) {
+ private Entry(long id, Thread holder) {
this.id = id;
+ this.holder = holder;
}
@Override
public String toString() {
return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
- + locked;
+ + locked + ", holder=" + holder;
}
}
@@ -70,7 +76,8 @@ public class IdLock {
* @throws IOException if interrupted
*/
public Entry getLockEntry(long id) throws IOException {
- Entry entry = new Entry(id);
+ Thread currentThread = Thread.currentThread();
+ Entry entry = new Entry(id, currentThread);
Entry existing;
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
synchronized (existing) {
@@ -99,6 +106,7 @@ public class IdLock {
--existing.numWaiters; // Remove ourselves from waiters.
existing.locked = true;
+ existing.holder = currentThread;
return existing;
}
// If the entry is not locked, it might already be deleted from the
@@ -120,7 +128,8 @@ public class IdLock {
*/
public Entry tryLockEntry(long id, long time) throws IOException {
Preconditions.checkArgument(time >= 0);
- Entry entry = new Entry(id);
+ Thread currentThread = Thread.currentThread();
+ Entry entry = new Entry(id, currentThread);
Entry existing;
long waitUtilTS = System.currentTimeMillis() + time;
long remaining = time;
@@ -158,6 +167,7 @@ public class IdLock {
--existing.numWaiters; // Remove ourselves from waiters.
}
existing.locked = true;
+ existing.holder = currentThread;
return existing;
}
// If the entry is not locked, it might already be deleted from the
@@ -169,14 +179,17 @@ public class IdLock {
}
/**
- * Must be called in a finally block to decrease the internal counter and
- * remove the monitor object for the given id if the caller is the last
- * client.
- *
+ * Must be called in a finally block to decrease the internal counter and remove the monitor
+ * object for the given id if the caller is the last client.
* @param entry the return value of {@link #getLockEntry(long)}
*/
public void releaseLockEntry(Entry entry) {
+ Thread currentThread = Thread.currentThread();
synchronized (entry) {
+ if (entry.holder != currentThread) {
+ LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
+ entry);
+ }
entry.locked = false;
if (entry.numWaiters > 0) {
entry.notify();
@@ -186,7 +199,21 @@ public class IdLock {
}
}
- /** For testing */
+ /**
+ * Test whether the given id is already locked by the current thread.
+ */
+ public boolean isHeldByCurrentThread(long id) {
+ Thread currentThread = Thread.currentThread();
+ Entry entry = map.get(id);
+ if (entry == null) {
+ return false;
+ }
+ synchronized (entry) {
+ return currentThread.equals(entry.holder);
+ }
+ }
+
+ @VisibleForTesting
void assertMapEmpty() {
assert map.isEmpty();
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
new file mode 100644
index 00000000000..e51b77b662b
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal cleaner that removes the completed procedure results after a TTL.
+ *
+ * NOTE: This is a special case handled in timeoutLoop().
+ *
+ * Since the client code looks more or less like:
+ *
+ *
+ * procId = master.doOperation()
+ * while (master.getProcResult(procId) == ProcInProgress);
+ *
+ *
+ * The master should not throw away the proc result as soon as the procedure is done but should wait
+ * a result request from the client (see executor.removeResult(procId)) The client will call
+ * something like master.isProcDone() or master.getProcResult() which will return the result/state
+ * to the client, and it will mark the completed proc as ready to delete. note that the client may
+ * not receive the response from the master (e.g. master failover) so, if we delay a bit the real
+ * deletion of the proc result the client will be able to get the result the next try.
+ */
+@InterfaceAudience.Private
+class CompletedProcedureCleaner extends ProcedureInMemoryChore {
+ private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
+
+ static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
+ private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
+
+ private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
+ private static final int DEFAULT_BATCH_SIZE = 32;
+
+ private final Map> completed;
+ private final Map nonceKeysToProcIdsMap;
+ private final ProcedureStore store;
+ private final IdLock procExecutionLock;
+ private Configuration conf;
+
+ public CompletedProcedureCleaner(Configuration conf, ProcedureStore store,
+ IdLock procExecutionLock, Map> completedMap,
+ Map nonceKeysToProcIdsMap) {
+ // set the timeout interval that triggers the periodic-procedure
+ super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
+ this.completed = completedMap;
+ this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
+ this.store = store;
+ this.procExecutionLock = procExecutionLock;
+ this.conf = conf;
+ }
+
+ @Override
+ protected void periodicExecute(final TEnvironment env) {
+ if (completed.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("No completed procedures to cleanup.");
+ }
+ return;
+ }
+
+ final long evictTtl =
+ conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
+ final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY,
+ ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
+ final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+
+ final long[] batchIds = new long[batchSize];
+ int batchCount = 0;
+
+ final long now = EnvironmentEdgeManager.currentTime();
+ final Iterator>> it =
+ completed.entrySet().iterator();
+ while (it.hasNext() && store.isRunning()) {
+ final Map.Entry> entry = it.next();
+ final CompletedProcedureRetainer retainer = entry.getValue();
+ final Procedure> proc = retainer.getProcedure();
+ IdLock.Entry lockEntry;
+ try {
+ lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
+ } catch (IOException e) {
+ // can only happen if interrupted, so not a big deal to propagate it
+ throw new UncheckedIOException(e);
+ }
+ try {
+ // TODO: Select TTL based on Procedure type
+ if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
+ // Failed procedures aren't persisted in WAL.
+ if (!(proc instanceof FailedProcedure)) {
+ batchIds[batchCount++] = entry.getKey();
+ if (batchCount == batchIds.length) {
+ store.delete(batchIds, 0, batchCount);
+ batchCount = 0;
+ }
+ }
+ final NonceKey nonceKey = proc.getNonceKey();
+ if (nonceKey != null) {
+ nonceKeysToProcIdsMap.remove(nonceKey);
+ }
+ it.remove();
+ LOG.trace("Evict completed {}", proc);
+ }
+ } finally {
+ procExecutionLock.releaseLockEntry(lockEntry);
+ }
+ }
+ if (batchCount > 0) {
+ store.delete(batchIds, 0, batchCount);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java
new file mode 100644
index 00000000000..d5f1ee7f6c3
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureRetainer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hold the reference to a completed root procedure. Will be cleaned up after expired.
+ */
+@InterfaceAudience.Private
+class CompletedProcedureRetainer {
+ private final Procedure procedure;
+ private long clientAckTime;
+
+ public CompletedProcedureRetainer(Procedure procedure) {
+ this.procedure = procedure;
+ clientAckTime = -1;
+ }
+
+ public Procedure getProcedure() {
+ return procedure;
+ }
+
+ public boolean hasClientAckTime() {
+ return clientAckTime != -1;
+ }
+
+ public long getClientAckTime() {
+ return clientAckTime;
+ }
+
+ public void setClientAckTime(long clientAckTime) {
+ this.clientAckTime = clientAckTime;
+ }
+
+ public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
+ return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
+ (now - procedure.getLastUpdate()) >= evictTtl;
+ }
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java
new file mode 100644
index 00000000000..8e7db44ad66
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/FailedProcedure.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.util.Objects;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+@InterfaceAudience.Private
+class FailedProcedure extends Procedure {
+
+ private String procName;
+
+ public FailedProcedure() {
+ }
+
+ public FailedProcedure(long procId, String procName, User owner, NonceKey nonceKey,
+ IOException exception) {
+ this.procName = procName;
+ setProcId(procId);
+ setState(ProcedureState.ROLLEDBACK);
+ setOwner(owner);
+ setNonceKey(nonceKey);
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ setSubmittedTime(currentTime);
+ setLastUpdate(currentTime);
+ setFailure(Objects.toString(exception.getMessage(), ""), exception);
+ }
+
+ @Override
+ public String getProcName() {
+ return procName;
+ }
+
+ @Override
+ protected Procedure[] execute(TEnvironment env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void rollback(TEnvironment env) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(TEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ }
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 542c5968e47..084da21b83c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -18,16 +18,14 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -87,6 +85,12 @@ public class ProcedureExecutor {
"hbase.procedure.worker.keep.alive.time.msec";
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
+ public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
+ static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
+
+ public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
+ static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
+
/**
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
@@ -155,134 +159,6 @@ public class ProcedureExecutor {
void procedureFinished(long procId);
}
- private static final class CompletedProcedureRetainer {
- private final Procedure procedure;
- private long clientAckTime;
-
- public CompletedProcedureRetainer(Procedure procedure) {
- this.procedure = procedure;
- clientAckTime = -1;
- }
-
- public Procedure getProcedure() {
- return procedure;
- }
-
- public boolean hasClientAckTime() {
- return clientAckTime != -1;
- }
-
- public long getClientAckTime() {
- return clientAckTime;
- }
-
- public void setClientAckTime(long clientAckTime) {
- this.clientAckTime = clientAckTime;
- }
-
- public boolean isExpired(long now, long evictTtl, long evictAckTtl) {
- return (hasClientAckTime() && (now - getClientAckTime()) >= evictAckTtl) ||
- (now - procedure.getLastUpdate()) >= evictTtl;
- }
- }
-
- /**
- * Internal cleaner that removes the completed procedure results after a TTL.
- * NOTE: This is a special case handled in timeoutLoop().
- *
- * Since the client code looks more or less like:
- *
- * procId = master.doOperation()
- * while (master.getProcResult(procId) == ProcInProgress);
- *
- * The master should not throw away the proc result as soon as the procedure is done
- * but should wait a result request from the client (see executor.removeResult(procId))
- * The client will call something like master.isProcDone() or master.getProcResult()
- * which will return the result/state to the client, and it will mark the completed
- * proc as ready to delete. note that the client may not receive the response from
- * the master (e.g. master failover) so, if we delay a bit the real deletion of
- * the proc result the client will be able to get the result the next try.
- */
- private static class CompletedProcedureCleaner
- extends ProcedureInMemoryChore {
- private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
-
- private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
- private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
-
- private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
- private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
-
- private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
- private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
-
- private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
- private static final int DEFAULT_BATCH_SIZE = 32;
-
- private final Map> completed;
- private final Map nonceKeysToProcIdsMap;
- private final ProcedureStore store;
- private Configuration conf;
-
- public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
- final Map> completedMap,
- final Map nonceKeysToProcIdsMap) {
- // set the timeout interval that triggers the periodic-procedure
- super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
- this.completed = completedMap;
- this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
- this.store = store;
- this.conf = conf;
- }
-
- @Override
- protected void periodicExecute(final TEnvironment env) {
- if (completed.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("No completed procedures to cleanup.");
- }
- return;
- }
-
- final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
- final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
- final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
-
- final long[] batchIds = new long[batchSize];
- int batchCount = 0;
-
- final long now = EnvironmentEdgeManager.currentTime();
- final Iterator>> it =
- completed.entrySet().iterator();
- while (it.hasNext() && store.isRunning()) {
- final Map.Entry> entry = it.next();
- final CompletedProcedureRetainer retainer = entry.getValue();
- final Procedure> proc = retainer.getProcedure();
-
- // TODO: Select TTL based on Procedure type
- if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
- // Failed procedures aren't persisted in WAL.
- if (!(proc instanceof FailedProcedure)) {
- batchIds[batchCount++] = entry.getKey();
- if (batchCount == batchIds.length) {
- store.delete(batchIds, 0, batchCount);
- batchCount = 0;
- }
- }
- final NonceKey nonceKey = proc.getNonceKey();
- if (nonceKey != null) {
- nonceKeysToProcIdsMap.remove(nonceKey);
- }
- it.remove();
- LOG.trace("Evict completed {}", proc);
- }
- }
- if (batchCount > 0) {
- store.delete(batchIds, 0, batchCount);
- }
- }
- }
-
/**
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
@@ -385,15 +261,26 @@ public class ProcedureExecutor {
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
try {
Procedure proc = procedures.get(procId);
- if (proc == null) {
- LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
- return;
- }
- // For a sub procedure which root parent has not been finished, we still need to retain the
- // wal even if the procedure itself is finished.
- if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
- LOG.debug("Procedure {} has already been finished, skip force updating.", proc);
- return;
+ if (proc != null) {
+ if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
+ LOG.debug("Procedure {} has already been finished and parent is succeeded," +
+ " skip force updating", proc);
+ return;
+ }
+ } else {
+ CompletedProcedureRetainer retainer = completed.get(procId);
+ if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
+ LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
+ return;
+ }
+ long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
+ long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
+ if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
+ LOG.debug("Procedure {} has already been finished and expired, skip force updating",
+ procId);
+ return;
+ }
+ proc = retainer.getProcedure();
}
LOG.debug("Force update procedure {}", proc);
store.update(proc);
@@ -731,7 +618,8 @@ public class ProcedureExecutor {
timeoutExecutor.add(new WorkerMonitor());
// Add completed cleaner chore
- addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
+ addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
+ nonceKeysToProcIdsMap));
}
public void stop() {
@@ -918,59 +806,6 @@ public class ProcedureExecutor {
}
}
- public static class FailedProcedure extends Procedure {
- private String procName;
-
- public FailedProcedure() {
- }
-
- public FailedProcedure(long procId, String procName, User owner,
- NonceKey nonceKey, IOException exception) {
- this.procName = procName;
- setProcId(procId);
- setState(ProcedureState.ROLLEDBACK);
- setOwner(owner);
- setNonceKey(nonceKey);
- long currentTime = EnvironmentEdgeManager.currentTime();
- setSubmittedTime(currentTime);
- setLastUpdate(currentTime);
- setFailure(Objects.toString(exception.getMessage(), ""), exception);
- }
-
- @Override
- public String getProcName() {
- return procName;
- }
-
- @Override
- protected Procedure[] execute(TEnvironment env)
- throws ProcedureYieldException, ProcedureSuspendedException,
- InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void rollback(TEnvironment env)
- throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean abort(TEnvironment env) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
- }
- }
-
/**
* If the failure failed before submitting it, we may want to give back the
* same error to the requests with the same nonceKey.
@@ -1616,53 +1451,74 @@ public class ProcedureExecutor {
int stackTail = subprocStack.size();
while (stackTail-- > 0) {
Procedure proc = subprocStack.get(stackTail);
- // For the sub procedures which are successfully finished, we do not rollback them.
- // Typically, if we want to rollback a procedure, we first need to rollback it, and then
- // recursively rollback its ancestors. The state changes which are done by sub procedures
- // should be handled by parent procedures when rolling back. For example, when rolling back a
- // MergeTableProcedure, we will schedule new procedures to bring the offline regions online,
- // instead of rolling back the original procedures which offlined the regions(in fact these
- // procedures can not be rolled back...).
- if (proc.isSuccess()) {
- // Just do the cleanup work, without actually executing the rollback
+ IdLock.Entry lockEntry = null;
+ // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
+ // this check, as the worker will hold the lock before executing a procedure. This is the only
+ // place where we may hold two procedure execution locks, and there is a fence in the
+ // RootProcedureState where we can make sure that only one worker can execute the rollback of
+ // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
+ // prevent race between us and the force update thread.
+ if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
+ try {
+ lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
+ } catch (IOException e) {
+ // can only happen if interrupted, so not a big deal to propagate it
+ throw new UncheckedIOException(e);
+ }
+ }
+ try {
+ // For the sub procedures which are successfully finished, we do not rollback them.
+ // Typically, if we want to rollback a procedure, we first need to rollback it, and then
+ // recursively rollback its ancestors. The state changes which are done by sub procedures
+ // should be handled by parent procedures when rolling back. For example, when rolling back
+ // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
+ // online, instead of rolling back the original procedures which offlined the regions(in
+ // fact these procedures can not be rolled back...).
+ if (proc.isSuccess()) {
+ // Just do the cleanup work, without actually executing the rollback
+ subprocStack.remove(stackTail);
+ cleanupAfterRollbackOneStep(proc);
+ continue;
+ }
+ LockState lockState = acquireLock(proc);
+ if (lockState != LockState.LOCK_ACQUIRED) {
+ // can't take a lock on the procedure, add the root-proc back on the
+ // queue waiting for the lock availability
+ return lockState;
+ }
+
+ lockState = executeRollback(proc);
+ releaseLock(proc, false);
+ boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
+ abortRollback |= !isRunning() || !store.isRunning();
+
+ // allows to kill the executor before something is stored to the wal.
+ // useful to test the procedure recovery.
+ if (abortRollback) {
+ return lockState;
+ }
+
subprocStack.remove(stackTail);
- cleanupAfterRollbackOneStep(proc);
- continue;
- }
- LockState lockState = acquireLock(proc);
- if (lockState != LockState.LOCK_ACQUIRED) {
- // can't take a lock on the procedure, add the root-proc back on the
- // queue waiting for the lock availability
- return lockState;
- }
- lockState = executeRollback(proc);
- releaseLock(proc, false);
- boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
- abortRollback |= !isRunning() || !store.isRunning();
+ // if the procedure is kind enough to pass the slot to someone else, yield
+ // if the proc is already finished, do not yield
+ if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
+ return LockState.LOCK_YIELD_WAIT;
+ }
- // allows to kill the executor before something is stored to the wal.
- // useful to test the procedure recovery.
- if (abortRollback) {
- return lockState;
- }
-
- subprocStack.remove(stackTail);
-
- // if the procedure is kind enough to pass the slot to someone else, yield
- // if the proc is already finished, do not yield
- if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
- return LockState.LOCK_YIELD_WAIT;
- }
-
- if (proc != rootProc) {
- execCompletionCleanup(proc);
+ if (proc != rootProc) {
+ execCompletionCleanup(proc);
+ }
+ } finally {
+ if (lockEntry != null) {
+ procExecutionLock.releaseLockEntry(lockEntry);
+ }
}
}
// Finalize the procedure state
- LOG.info("Rolled back " + rootProc +
- " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()));
+ LOG.info("Rolled back {} exec-time={}", rootProc,
+ StringUtils.humanTimeDiff(rootProc.elapsedTime()));
procedureFinished(rootProc);
return LockState.LOCK_ACQUIRED;
}
@@ -2047,6 +1903,11 @@ public class ProcedureExecutor {
return scheduler;
}
+ @VisibleForTesting
+ int getCompletedSize() {
+ return completed.size();
+ }
+
// ==========================================================================
// Worker Thread
// ==========================================================================
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java
similarity index 63%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java
index df6ee51efe3..178b0cbc98a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestForceUpdateProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -27,19 +27,19 @@ import java.util.concurrent.Exchanger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
@@ -60,8 +60,12 @@ public class TestForceUpdateProcedure {
private static int WAL_COUNT = 5;
- private static void createStoreAndExecutor() throws IOException {
- Path logDir = UTIL.getDataTestDir("proc-wals");
+ @Rule
+ public final TestName name = new TestName();
+
+ private void createStoreAndExecutor() throws IOException {
+ UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000);
+ Path logDir = UTIL.getDataTestDir(name.getMethodName());
STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
STORE.start(1);
EXEC = new ProcedureExecutor(UTIL.getConfiguration(), null, STORE);
@@ -69,12 +73,11 @@ public class TestForceUpdateProcedure {
}
@BeforeClass
- public static void setUp() throws IOException {
+ public static void setUpBeforeClass() throws IOException {
UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
- createStoreAndExecutor();
}
- private static void stopStoreAndExecutor() {
+ private void stopStoreAndExecutor() {
EXEC.stop();
STORE.stop(false);
EXEC = null;
@@ -82,12 +85,21 @@ public class TestForceUpdateProcedure {
}
@AfterClass
- public static void tearDown() throws IOException {
- stopStoreAndExecutor();
+ public static void tearDownAfterClass() throws IOException {
UTIL.cleanupTestDir();
}
- public static final class WaitingProcedure extends Procedure {
+ @Before
+ public void setUp() throws IOException {
+ createStoreAndExecutor();
+ }
+
+ @After
+ public void tearDown() {
+ stopStoreAndExecutor();
+ }
+
+ public static final class WaitingProcedure extends NoopProcedure {
@Override
protected Procedure[] execute(Void env)
@@ -97,82 +109,19 @@ public class TestForceUpdateProcedure {
setTimeout(Integer.MAX_VALUE);
throw new ProcedureSuspendedException();
}
-
- @Override
- protected void rollback(Void env) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean abort(Void env) {
- return false;
- }
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
}
- public static final class ParentProcedure extends Procedure {
+ public static final class ParentProcedure extends NoopProcedure {
@SuppressWarnings("unchecked")
@Override
protected Procedure[] execute(Void env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
- }
-
- @Override
- protected void rollback(Void env) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean abort(Void env) {
- return false;
- }
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() };
}
}
- public static final class DummyProcedure extends Procedure {
-
- @Override
- protected Procedure[] execute(Void env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- return null;
- }
-
- @Override
- protected void rollback(Void env) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean abort(Void env) {
- return false;
- }
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
- }
-
- public static final class ExchangeProcedure extends Procedure {
+ public static final class ExchangeProcedure extends NoopProcedure {
@SuppressWarnings("unchecked")
@Override
@@ -184,28 +133,18 @@ public class TestForceUpdateProcedure {
return null;
}
}
+ }
+
+ public static final class NoopNoAckProcedure extends NoopProcedure {
@Override
- protected void rollback(Void env) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean abort(Void env) {
+ protected boolean shouldWaitClientAck(Void env) {
return false;
}
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
}
@Test
- public void test() throws IOException, InterruptedException {
+ public void testProcedureStuck() throws IOException, InterruptedException {
EXEC.submitProcedure(new ParentProcedure());
EXCHANGER.exchange(Boolean.TRUE);
UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
@@ -240,7 +179,26 @@ public class TestForceUpdateProcedure {
assertEquals(ProcedureState.WAITING, parentProc.getState());
WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
- DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class);
- assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
+ NoopProcedure noopProc = (NoopProcedure) procMap.get(NoopProcedure.class);
+ assertEquals(ProcedureState.SUCCESS, noopProc.getState());
+ }
+
+ @Test
+ public void testCompletedProcedure() throws InterruptedException, IOException {
+ long procId = EXEC.submitProcedure(new ExchangeProcedure());
+ EXCHANGER.exchange(Boolean.FALSE);
+ UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
+ for (int i = 0; i < WAL_COUNT - 1; i++) {
+ assertTrue(STORE.rollWriterForTesting());
+ // The exchange procedure is completed but still not deleted yet so we can not delete the
+ // oldest wal file
+ long pid = EXEC.submitProcedure(new NoopNoAckProcedure());
+ assertEquals(2 + i, STORE.getActiveLogs().size());
+ UTIL.waitFor(10000, () -> EXEC.isFinished(pid));
+ }
+ // Only the exchange procedure can not be deleted
+ UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1);
+ STORE.rollWriterForTesting();
+ UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e2e4aec00b6..193f8456ab8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running).
* To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true.
+ * For triggering pre commit
*/
@InterfaceAudience.Public
@SuppressWarnings("deprecation")