HBASE-16744 Procedure V2 - Lock procedures to allow clients to acquire

locks on tables/namespaces/regions (Matteo Bertozzi)

Incorporates review comments from
    https://reviews.apache.org/r/52589/
    https://reviews.apache.org/r/54388/

M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
 Fix for eclipse complaint (from Duo Zhang)

M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 Log formatting

M hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
 Added wait procedures utility.

A hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/LockServiceProtos.java
A hbase-protocol-shaded/src/main/protobuf/LockService.proto b/hbase-protocol-shaded/src/main/protobuf/LockService.proto
 Implement new locking CP overrides.

A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java
 New hbase entity lock (ns, table, or regions)

A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/LockServiceClient.java
 Client that can use the new internal locking service.
This commit is contained in:
Michael Stack 2017-01-11 14:38:59 -08:00
parent 9fd5dab1a8
commit 4cb09a494c
26 changed files with 7709 additions and 87 deletions

View File

@ -386,8 +386,8 @@ public interface AsyncTableBase {
* @return A list of {@link CompletableFuture}s that represent the existence for each get.
*/
default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists()))
.collect(toList());
return get(toCheckExistenceOnly(gets)).stream().
<CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList());
}
/**

View File

@ -243,24 +243,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
final StringBuilder sb = new StringBuilder();
toStringClassDetails(sb);
sb.append(" id=");
sb.append(", procId=");
sb.append(getProcId());
if (hasParent()) {
sb.append(" parent=");
sb.append(", parent=");
sb.append(getParentProcId());
}
if (hasOwner()) {
sb.append(" owner=");
sb.append(", owner=");
sb.append(getOwner());
}
sb.append(" state=");
sb.append(", state=");
toStringState(sb);
if (hasException()) {
sb.append(" failed=" + getException());
sb.append(", failed=" + getException());
}
return sb;

View File

@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -91,7 +92,7 @@ public class ProcedureExecutor<TEnvironment> {
final boolean kill = this.killBeforeStoreUpdate;
if (this.toggleKillBeforeStoreUpdate) {
this.killBeforeStoreUpdate = !kill;
LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
}
return kill;
}
@ -172,7 +173,7 @@ public class ProcedureExecutor<TEnvironment> {
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
final boolean isDebugEnabled = LOG.isDebugEnabled();
final boolean debugEnabled = LOG.isDebugEnabled();
while (it.hasNext() && store.isRunning()) {
final Map.Entry<Long, ProcedureInfo> entry = it.next();
final ProcedureInfo procInfo = entry.getValue();
@ -180,8 +181,8 @@ public class ProcedureExecutor<TEnvironment> {
// TODO: Select TTL based on Procedure type
if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
(now - procInfo.getLastUpdate()) >= evictTtl) {
if (isDebugEnabled) {
LOG.debug("Evict completed procedure: " + procInfo);
if (debugEnabled) {
LOG.debug("Evict completed " + procInfo);
}
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
@ -281,7 +282,7 @@ public class ProcedureExecutor<TEnvironment> {
@Override
public void setMaxProcId(long maxProcId) {
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
LOG.debug("load procedures maxProcId=" + maxProcId);
LOG.debug("Load maxProcId=" + maxProcId);
lastProcId.set(maxProcId);
}
@ -295,7 +296,7 @@ public class ProcedureExecutor<TEnvironment> {
int corruptedCount = 0;
while (procIter.hasNext()) {
ProcedureInfo proc = procIter.nextAsProcedureInfo();
LOG.error("corrupted procedure: " + proc);
LOG.error("Corrupt " + proc);
corruptedCount++;
}
if (abortOnCorruption && corruptedCount > 0) {
@ -307,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> {
private void loadProcedures(final ProcedureIterator procIter,
final boolean abortOnCorruption) throws IOException {
final boolean isDebugEnabled = LOG.isDebugEnabled();
final boolean debugEnabled = LOG.isDebugEnabled();
// 1. Build the rollback stack
int runnablesCount = 0;
@ -320,8 +321,8 @@ public class ProcedureExecutor<TEnvironment> {
nonceKey = proc.getNonceKey();
procId = proc.getProcId();
completed.put(proc.getProcId(), proc);
if (isDebugEnabled) {
LOG.debug("The procedure is completed: " + proc);
if (debugEnabled) {
LOG.debug("Completed " + proc);
}
} else {
Procedure proc = procIter.nextAsProcedure();
@ -361,8 +362,8 @@ public class ProcedureExecutor<TEnvironment> {
Procedure proc = procIter.nextAsProcedure();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
if (isDebugEnabled) {
LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
if (debugEnabled) {
LOG.debug(String.format("Loading state=%s isFailed=%s: %s",
proc.getState(), proc.hasException(), proc));
}
@ -425,7 +426,7 @@ public class ProcedureExecutor<TEnvironment> {
if (procStack.isValid()) continue;
for (Procedure proc: procStack.getSubproceduresStack()) {
LOG.error("corrupted procedure: " + proc);
LOG.error("Corrupted " + proc);
procedures.remove(proc.getProcId());
runnableList.remove(proc);
if (waitingSet != null) waitingSet.remove(proc);
@ -485,7 +486,7 @@ public class ProcedureExecutor<TEnvironment> {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
LOG.info("Starting procedure executor threads=" + corePoolSize);
LOG.info("Starting executor threads=" + corePoolSize);
// Create the Thread Group for the executors
threadGroup = new ThreadGroup("ProcedureExecutor");
@ -506,7 +507,7 @@ public class ProcedureExecutor<TEnvironment> {
st = EnvironmentEdgeManager.currentTime();
store.recoverLease();
et = EnvironmentEdgeManager.currentTime();
LOG.info(String.format("recover procedure store (%s) lease: %s",
LOG.info(String.format("Recover store (%s) lease: %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// start the procedure scheduler
@ -520,11 +521,11 @@ public class ProcedureExecutor<TEnvironment> {
st = EnvironmentEdgeManager.currentTime();
load(abortOnCorruption);
et = EnvironmentEdgeManager.currentTime();
LOG.info(String.format("load procedure store (%s): %s",
LOG.info(String.format("Load store (%s): %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// Start the executors. Here we must have the lastProcId set.
LOG.debug("start workers " + workerThreads.size());
LOG.debug("Start workers " + workerThreads.size());
timeoutExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
@ -542,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> {
return;
}
LOG.info("Stopping the procedure executor");
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
}
@ -564,7 +565,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
threadGroup.destroy();
} catch (IllegalThreadStateException e) {
LOG.error("thread group " + threadGroup + " contains running threads");
LOG.error("Thread group " + threadGroup + " contains running threads");
threadGroup.list();
} finally {
threadGroup = null;
@ -693,12 +694,12 @@ public class ProcedureExecutor<TEnvironment> {
// we found a registered nonce, but the procedure may not have been submitted yet.
// since the client expect the procedure to be submitted, spin here until it is.
final boolean isTraceEnabled = LOG.isTraceEnabled();
final boolean traceEnabled = LOG.isTraceEnabled();
while (isRunning() &&
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
if (isTraceEnabled) {
LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
if (traceEnabled) {
LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted");
}
Threads.sleep(100);
}
@ -787,7 +788,7 @@ public class ProcedureExecutor<TEnvironment> {
// Commit the transaction
store.insert(proc, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Procedure " + proc + " added to the store.");
LOG.debug("Stored " + proc);
}
// Add the procedure to the executor
@ -811,7 +812,7 @@ public class ProcedureExecutor<TEnvironment> {
// Commit the transaction
store.insert(procs);
if (LOG.isDebugEnabled()) {
LOG.debug("Procedures added to the store: " + Arrays.toString(procs));
LOG.debug("Stored " + Arrays.toString(procs));
}
// Add the procedure to the executor
@ -880,6 +881,14 @@ public class ProcedureExecutor<TEnvironment> {
return procedures.get(procId);
}
public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) {
final Procedure proc = getProcedure(procId);
if (clazz.isInstance(proc)) {
return (T)proc;
}
return null;
}
public ProcedureInfo getResult(final long procId) {
return completed.get(procId);
}
@ -917,7 +926,7 @@ public class ProcedureExecutor<TEnvironment> {
if (result == null) {
assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
if (LOG.isDebugEnabled()) {
LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
LOG.debug("procId=" + procId + " already removed by the cleaner.");
}
return;
}
@ -999,7 +1008,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
listener.procedureLoaded(procId);
} catch (Throwable e) {
LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
}
}
}
@ -1011,7 +1020,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
listener.procedureAdded(procId);
} catch (Throwable e) {
LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
}
}
}
@ -1023,7 +1032,7 @@ public class ProcedureExecutor<TEnvironment> {
try {
listener.procedureFinished(procId);
} catch (Throwable e) {
LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
}
}
}
@ -1053,6 +1062,11 @@ public class ProcedureExecutor<TEnvironment> {
return lastProcId.get();
}
@VisibleForTesting
public Set<Long> getActiveProcIds() {
return procedures.keySet();
}
private Long getRootProcedureId(Procedure proc) {
return Procedure.getRootProcedureId(procedures, proc);
}
@ -1111,7 +1125,7 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Procedure completed in " +
LOG.debug("Completed in " +
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
}
// Finalize the procedure state
@ -1203,7 +1217,7 @@ public class ProcedureExecutor<TEnvironment> {
}
// Finalize the procedure state
LOG.info("Rolledback procedure " + rootProc +
LOG.info("Rolled back " + rootProc +
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
" exception=" + exception.getMessage());
procedureFinished(rootProc);
@ -1220,7 +1234,7 @@ public class ProcedureExecutor<TEnvironment> {
proc.doRollback(getEnvironment());
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("rollback attempt failed for " + proc, e);
LOG.debug("Roll back attempt failed for " + proc, e);
}
return false;
} catch (InterruptedException e) {
@ -1294,7 +1308,7 @@ public class ProcedureExecutor<TEnvironment> {
isSuspended = true;
} catch (ProcedureYieldException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
LOG.trace("Yield " + procedure + ": " + e.getMessage());
}
scheduler.yield(procedure);
return;
@ -1418,8 +1432,8 @@ public class ProcedureExecutor<TEnvironment> {
}
// If this procedure is the last child awake the parent procedure
final boolean isTraceEnabled = LOG.isTraceEnabled();
if (isTraceEnabled) {
final boolean traceEnabled = LOG.isTraceEnabled();
if (traceEnabled) {
LOG.trace(parent + " child is done: " + procedure);
}
@ -1427,7 +1441,7 @@ public class ProcedureExecutor<TEnvironment> {
parent.setState(ProcedureState.RUNNABLE);
store.update(parent);
scheduler.addFront(parent);
if (isTraceEnabled) {
if (traceEnabled) {
LOG.trace(parent + " all the children finished their work, resume.");
}
return;
@ -1438,7 +1452,7 @@ public class ProcedureExecutor<TEnvironment> {
final Procedure procedure, final Procedure[] subprocs) {
if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
}
store.insert(procedure, subprocs);
} else {
@ -1464,7 +1478,7 @@ public class ProcedureExecutor<TEnvironment> {
private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e);
}
// NOTE: We don't call Thread.currentThread().interrupt()
@ -1530,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> {
@Override
public void run() {
final boolean isTraceEnabled = LOG.isTraceEnabled();
final boolean traceEnabled = LOG.isTraceEnabled();
long lastUpdate = EnvironmentEdgeManager.currentTime();
while (isRunning() && keepAlive(lastUpdate)) {
final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
@ -1539,7 +1553,7 @@ public class ProcedureExecutor<TEnvironment> {
store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
executionStartTime.set(EnvironmentEdgeManager.currentTime());
try {
if (isTraceEnabled) {
if (traceEnabled) {
LOG.trace("Trying to start the execution of " + procedure);
}
executeProcedure(procedure);
@ -1549,7 +1563,7 @@ public class ProcedureExecutor<TEnvironment> {
executionStartTime.set(Long.MAX_VALUE);
}
}
LOG.debug("worker thread terminated " + this);
LOG.debug("Worker thread terminated " + this);
workerThreads.remove(this);
}
@ -1691,7 +1705,7 @@ public class ProcedureExecutor<TEnvironment> {
sendStopSignal();
join(250);
if (i > 0 && (i % 8) == 0) {
LOG.warn("waiting termination of thread " + getName() + ", " +
LOG.warn("Waiting termination of thread " + getName() + ", " +
StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
}
}
@ -1767,7 +1781,7 @@ public class ProcedureExecutor<TEnvironment> {
// WARN the worker is stuck
stuckCount++;
LOG.warn("found worker stuck " + worker +
LOG.warn("Worker stuck " + worker +
" run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
}
return stuckCount;
@ -1785,7 +1799,7 @@ public class ProcedureExecutor<TEnvironment> {
final WorkerThread worker = new WorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("added a new worker thread " + worker);
LOG.debug("Added new worker thread " + worker);
}
}

View File

@ -294,7 +294,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
@Override
public void setRunningProcedureCount(final int count) {
LOG.debug("set running procedure count=" + count + " slots=" + slots.length);
LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length);
this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
}
@ -326,7 +326,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
flushLogId = initOldLogs(oldLogs);
} catch (FileNotFoundException e) {
LOG.warn("someone else is active and deleted logs. retrying.", e);
LOG.warn("Someone else is active and deleted logs. retrying.", e);
oldLogs = getLogFiles();
continue;
}
@ -334,7 +334,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Create new state-log
if (!rollWriter(flushLogId + 1)) {
// someone else has already created this log
LOG.debug("someone else has already created log " + flushLogId);
LOG.debug("Someone else has already created log " + flushLogId);
continue;
}
@ -428,7 +428,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
try {
periodicRoll();
} catch (IOException e) {
LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e);
LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e);
}
}

View File

@ -206,6 +206,18 @@ public class ProcedureTestingUtility {
}
}
public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
for (int i = 0; i < procIds.length; ++i) {
waitProcedure(procExecutor, procIds[i]);
}
}
public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
for (long procId : procExecutor.getActiveProcIds()) {
waitProcedure(procExecutor, procId);
}
}
public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
int stableRuns = 0;
while (stableRuns < 10) {

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "LockServiceProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "HBase.proto";
enum LockType {
EXCLUSIVE = 1;
SHARED = 2;
}
message LockRequest {
required LockType lock_type = 1;
optional string namespace = 2;
optional TableName table_name = 3;
repeated RegionInfo region_info = 4;
optional string description = 5;
optional uint64 nonce_group = 6 [default = 0];
optional uint64 nonce = 7 [default = 0];
}
message LockResponse {
required uint64 proc_id = 1;
}
message LockHeartbeatRequest {
required uint64 proc_id = 1;
optional bool keep_alive = 2 [default = true];
}
message LockHeartbeatResponse {
enum LockStatus {
UNLOCKED = 1;
LOCKED = 2;
}
required LockStatus lock_status = 1;
// Timeout of lock (if locked).
optional uint32 timeout_ms = 2;
}
message LockProcedureData {
required LockType lock_type = 1;
optional string namespace = 2;
optional TableName table_name = 3;
repeated RegionInfo region_info = 4;
optional string description = 5;
optional bool is_master_lock = 6 [default = false];
}
service LockService {
/** Acquire lock on namespace/table/region */
rpc RequestLock(LockRequest) returns(LockResponse);
/** Keep alive (or not) a previously acquired lock */
rpc LockHeartbeat(LockHeartbeatRequest) returns(LockHeartbeatResponse);
}

View File

@ -20,19 +20,11 @@
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -54,12 +46,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.locking.LockProcedure.LockType;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
@ -80,12 +72,18 @@ import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesR
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@InterfaceAudience.Private
public class RSGroupAdminEndpoint extends RSGroupAdminService
implements CoprocessorService, Coprocessor, MasterObserver {
private static final Log LOG = LogFactory.getLog(RSGroupAdminEndpoint.class);
public class RSGroupAdminEndpoint extends RSGroupAdminService implements CoprocessorService,
Coprocessor, MasterObserver {
private MasterServices master = null;
private static RSGroupInfoManagerImpl groupInfoManager;
@ -97,7 +95,7 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
master = menv.getMasterServices();
setGroupInfoManager(new RSGroupInfoManagerImpl(master));
groupAdminServer = new RSGroupAdminServer(master, groupInfoManager);
Class clazz =
Class<?> clazz =
master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null);
if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) {
throw new IOException("Configured balancer is not a GroupableBalancer");
@ -1181,4 +1179,28 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final HRegionInfo[] regionsToMerge) throws IOException {
}
}
@Override
public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
}
@Override
public void postLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
}
@Override
public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace, TableName tableName,
HRegionInfo[] regionInfos, LockType type, String description) throws IOException {
// TODO Auto-generated method stub
}
@Override
public void postRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace, TableName tableName,
HRegionInfo[] regionInfos, LockType type, String description) throws IOException {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.locking;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
/**
* Lock for HBase Entity either a Table, a Namespace, or Regions.
*
* These are remote locks which live on master, and need periodic heartbeats to keep them alive.
* (Once we request the lock, internally an heartbeat thread will be started on the client).
* If master does not receive the heartbeat in time, it'll release the lock and make it available
* to other users.
*
* <p>Use {@link LockServiceClient} to build instances. Then call {@link #requestLock()}.
* {@link #requestLock} will contact master to queue the lock and start the heartbeat thread
* which will check lock's status periodically and once the lock is acquired, it will send the
* heartbeats to the master.
*
* <p>Use {@link #await} or {@link #await(long, TimeUnit)} to wait for the lock to be acquired.
* Always call {@link #unlock()} irrespective of whether lock was acquired or not. If the lock
* was acquired, it'll be released. If it was not acquired, it is possible that master grants the
* lock in future and the heartbeat thread keeps it alive forever by sending heartbeats.
* Calling {@link #unlock()} will stop the heartbeat thread and cancel the lock queued on master.
*
* <p>There are 4 ways in which these remote locks may be released/can be lost:
* <ul><li>Call {@link #unlock}.</li>
* <li>Lock times out on master: Can happen because of network issues, GC pauses, etc.
* Worker thread will call the given abortable as soon as it detects such a situation.</li>
* <li>Fail to contact master: If worker thread can not contact mater and thus fails to send
* heartbeat before the timeout expires, it assumes that lock is lost and calls the
* abortable.</li>
* <li>Worker thread is interrupted.</li>
* </ul>
*
* Use example:
* <code>
* EntityLock lock = lockServiceClient.*Lock(...., "exampled lock", abortable);
* lock.requestLock();
* ....
* ....can do other initializations here since lock is 'asynchronous'...
* ....
* if (lock.await(timeout)) {
* ....logic requiring mutual exclusion
* }
* lock.unlock();
* </code>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class EntityLock {
private static final Log LOG = LogFactory.getLog(EntityLock.class);
public static final String HEARTBEAT_TIME_BUFFER =
"hbase.client.locks.heartbeat.time.buffer.ms";
private final AtomicBoolean locked = new AtomicBoolean(false);
private final CountDownLatch latch = new CountDownLatch(1);
private final LockService.BlockingInterface stub;
private final LockHeartbeatWorker worker;
private final LockRequest lockRequest;
private final Abortable abort;
// Buffer for unexpected delays (GC, network delay, etc) in heartbeat rpc.
private final int heartbeatTimeBuffer;
// set to a non-zero value for tweaking sleep time during testing so that worker doesn't wait
// for long time periods between heartbeats.
private long testingSleepTime = 0;
private Long procId = null;
/**
* Abortable.abort() is called when the lease of the lock will expire.
* It's up to the user decide if simply abort the process or handle the loss of the lock
* by aborting the operation that was supposed to be under lock.
*/
EntityLock(Configuration conf, LockService.BlockingInterface stub,
LockRequest request, Abortable abort) {
this.stub = stub;
this.lockRequest = request;
this.abort = abort;
this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000);
this.worker = new LockHeartbeatWorker(lockRequest.getDescription());
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("EntityLock locked=");
sb.append(locked.get());
sb.append(", procId=");
sb.append(procId);
sb.append(", type=");
sb.append(lockRequest.getLockType());
if (lockRequest.getRegionInfoCount() > 0) {
sb.append(", regions=");
for (int i = 0; i < lockRequest.getRegionInfoCount(); ++i) {
if (i > 0) sb.append(", ");
sb.append(lockRequest.getRegionInfo(i));
}
} else if (lockRequest.hasTableName()) {
sb.append(", table=");
sb.append(lockRequest.getTableName());
} else if (lockRequest.hasNamespace()) {
sb.append(", namespace=");
sb.append(lockRequest.getNamespace());
}
sb.append(", description=");
sb.append(lockRequest.getDescription());
return sb.toString();
}
@VisibleForTesting
void setTestingSleepTime(long timeInMillis) {
testingSleepTime = timeInMillis;
}
@VisibleForTesting
LockHeartbeatWorker getWorker() {
return worker;
}
public boolean isLocked() {
return locked.get();
}
/**
* Sends rpc to the master to request lock.
* The lock request is queued with other lock requests.
*/
public void requestLock() throws IOException {
if (procId == null) {
try {
procId = stub.requestLock(null, lockRequest).getProcId();
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
worker.start();
} else {
LOG.info("Lock already queued : " + toString());
}
}
/**
* @param timeout in milliseconds. If set to 0, waits indefinitely.
* @return true if lock was acquired; and false if waiting time elapsed before lock could be
* acquired.
*/
public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
final boolean result = latch.await(timeout, timeUnit);
String lockRequestStr = lockRequest.toString().replace("\n", ", ");
if (result) {
LOG.info("Acquired " + lockRequestStr);
} else {
LOG.info(String.format("Failed acquire in %s %s of %s", timeout, timeUnit.toString(),
lockRequestStr));
}
return result;
}
public void await() throws InterruptedException {
latch.await();
}
public void unlock() throws IOException {
locked.set(false);
worker.interrupt();
Threads.shutdown(worker);
try {
stub.lockHeartbeat(null,
LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
protected class LockHeartbeatWorker extends Thread {
public LockHeartbeatWorker(final String desc) {
super("LockHeartbeatWorker(" + desc + ")");
}
public void run() {
final LockHeartbeatRequest lockHeartbeatRequest =
LockHeartbeatRequest.newBuilder().setProcId(procId).build();
LockHeartbeatResponse response;
while (true) {
try {
response = stub.lockHeartbeat(null, lockHeartbeatRequest);
} catch (Exception e) {
e = ProtobufUtil.handleRemoteException(e);
locked.set(false);
LOG.error("Heartbeat failed, releasing " + EntityLock.this, e);
abort.abort("Heartbeat failed", e);
return;
}
if (!isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
locked.set(true);
latch.countDown();
} else if (isLocked() && response.getLockStatus() == LockHeartbeatResponse.LockStatus.UNLOCKED) {
// Lock timed out.
locked.set(false);
abort.abort("Lock timed out.", null);
return;
}
try {
// If lock not acquired yet, poll faster so we can notify faster.
long sleepTime = 1000;
if (isLocked()) {
// If lock acquired, then use lock timeout to determine heartbeat rate.
// If timeout is <heartbeatTimeBuffer, send back to back heartbeats.
sleepTime = Math.max(response.getTimeoutMs() - heartbeatTimeBuffer, 1);
}
if (testingSleepTime != 0) {
sleepTime = testingSleepTime;
}
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// Since there won't be any more heartbeats, assume lock will be lost.
locked.set(false);
LOG.error("Interrupted, releasing " + EntityLock.this, e);
abort.abort("Worker thread interrupted", e);
return;
}
}
}
}
}

View File

@ -0,0 +1,111 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.locking;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
/**
* Helper class to create "master locks" for namespaces, tables and regions.
* DEV-NOTE: At the moment this class is used only by the RS for MOB,
* to prevent other MOB compaction to conflict.
* The RS has already the stub of the LockService, so we have only one constructor that
* takes the LockService stub. If in the future we are going to use this in other places
* we should add a constructor that from conf or connection, creates the stub.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@InterfaceStability.Evolving
public class LockServiceClient {
private final LockService.BlockingInterface stub;
private final Configuration conf;
private final NonceGenerator ng;
public LockServiceClient(final Configuration conf, final LockService.BlockingInterface stub,
final NonceGenerator ng) {
this.conf = conf;
this.stub = stub;
this.ng = ng;
}
/**
* Create a new EntityLock object to acquire an exclusive or shared lock on a table.
* Internally, the table namespace will also be locked in shared mode.
*/
public EntityLock tableLock(final TableName tableName, final boolean exclusive,
final String description, final Abortable abort) {
LockRequest lockRequest = buildLockRequest(exclusive ? LockType.EXCLUSIVE : LockType.SHARED,
tableName.getNameAsString(), null, null, description, ng.getNonceGroup(), ng.newNonce());
return new EntityLock(conf, stub, lockRequest, abort);
}
/**
* LocCreate a new EntityLock object to acquire exclusive lock on a namespace.
* Clients can not acquire shared locks on namespace.
*/
public EntityLock namespaceLock(String namespace, String description, Abortable abort) {
LockRequest lockRequest = buildLockRequest(LockType.EXCLUSIVE,
namespace, null, null, description, ng.getNonceGroup(), ng.newNonce());
return new EntityLock(conf, stub, lockRequest, abort);
}
/**
* Create a new EntityLock object to acquire exclusive lock on multiple regions of same tables.
* Internally, the table and its namespace will also be locked in shared mode.
*/
public EntityLock regionLock(List<HRegionInfo> regionInfos, String description, Abortable abort) {
LockRequest lockRequest = buildLockRequest(LockType.EXCLUSIVE,
null, null, regionInfos, description, ng.getNonceGroup(), ng.newNonce());
return new EntityLock(conf, stub, lockRequest, abort);
}
@VisibleForTesting
public static LockRequest buildLockRequest(final LockType type,
final String namespace, final TableName tableName, final List<HRegionInfo> regionInfos,
final String description, final long nonceGroup, final long nonce) {
final LockRequest.Builder builder = LockRequest.newBuilder()
.setLockType(type)
.setNonceGroup(nonceGroup)
.setNonce(nonce);
if (regionInfos != null) {
for (HRegionInfo hri: regionInfos) {
builder.addRegionInfo(HRegionInfo.convert(hri));
}
} else if (namespace != null) {
builder.setNamespace(namespace);
} else if (tableName != null) {
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
}
return builder.setDescription(description).build();
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
@ -878,4 +879,26 @@ public class BaseMasterAndRegionObserver extends BaseRegionObserver
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final HRegionInfo[] regionsToMerge) throws IOException {
}
@Override
public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException {
}
@Override
public void postRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException {
}
@Override
public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
}
@Override
public void postLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
@ -1147,6 +1148,28 @@ public class BaseMasterObserver implements MasterObserver {
@Override
public void postBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
String groupName, boolean balancerRan) throws IOException {
String groupName, boolean balancerRan) throws IOException {
}
@Override
public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException {
}
@Override
public void postRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException {
}
@Override
public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
}
@Override
public void postLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
}
}

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.coprocessor;
import com.google.common.net.HostAndPort;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -40,13 +38,15 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import com.google.common.net.HostAndPort;
/**
* Defines coprocessor hooks for interacting with operations on the
* {@link org.apache.hadoop.hbase.master.HMaster} process.
@ -1693,7 +1693,7 @@ public interface MasterObserver extends Coprocessor {
final String namespace, final Quotas quotas) throws IOException;
/**
* Called before dispatching region merge request.
* Called before dispatching region merge request.
* It can't bypass the default action, e.g., ctx.bypass() won't have effect.
* @param ctx coprocessor environment
* @param regionA first region to be merged
@ -1702,7 +1702,7 @@ public interface MasterObserver extends Coprocessor {
*/
void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
HRegionInfo regionA, HRegionInfo regionB) throws IOException;
/**
* called after dispatching the region merge request.
* @param c coprocessor environment
@ -1971,4 +1971,30 @@ public interface MasterObserver extends Coprocessor {
default void postListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String regex) throws IOException {
}
/**
* Called before new LockProcedure is queued.
*/
public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException;
/**
* Called after new LockProcedure is queued.
*/
public void postRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException;
/**
* Called before heartbeat to a lock.
*/
public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException;
/**
* Called after heartbeat to a lock.
*/
public void postLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException;
}

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
@ -322,6 +323,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// Maximum percent of regions in transition when balancing
private final double maxRitPercent;
private final LockManager lockManager = new LockManager(this);
private LoadBalancer balancer;
private RegionNormalizer normalizer;
private BalancerChore balancerChore;
@ -674,7 +677,6 @@ public class HMaster extends HRegionServer implements MasterServices {
this.mpmHost.register(new MasterFlushTableProcedureManager());
this.mpmHost.loadProcedures(conf);
this.mpmHost.initialize(this, this.metricsMaster);
}
/**
@ -3289,4 +3291,9 @@ public class HMaster extends HRegionServer implements MasterServices {
this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke);
}
}
@Override
public LockManager getLockManager() {
return lockManager;
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -1789,4 +1790,46 @@ public class MasterCoprocessorHost
}
});
}
public void preRequestLock(String namespace, TableName tableName, HRegionInfo[] regionInfos,
LockProcedure.LockType type, String description) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.preRequestLock(ctx, namespace, tableName, regionInfos, type, description);
}
});
}
public void postRequestLock(String namespace, TableName tableName, HRegionInfo[] regionInfos,
LockProcedure.LockType type, String description) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.postRequestLock(ctx, namespace, tableName, regionInfos, type, description);
}
});
}
public void preLockHeartbeat(LockProcedure proc, boolean keepAlive) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.preLockHeartbeat(ctx, proc, keepAlive);
}
});
}
public void postLockHeartbeat(LockProcedure proc, boolean keepAlive) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.postLockHeartbeat(ctx, proc, keepAlive);
}
});
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.*;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.*;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse.Capability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@ -108,7 +110,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessController;
import org.apache.hadoop.hbase.security.visibility.VisibilityController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
@ -125,7 +126,8 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class MasterRpcServices extends RSRpcServices
implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface {
implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
LockService.BlockingInterface {
private static final Log LOG = LogFactory.getLog(MasterRpcServices.class.getName());
private final HMaster master;
@ -1796,4 +1798,66 @@ public class MasterRpcServices extends RSRpcServices
return response.build();
}
@Override
public LockResponse requestLock(RpcController controller, LockRequest request)
throws ServiceException {
try {
if (request.getDescription().isEmpty()) {
throw new IllegalArgumentException("Empty description");
}
final long procId;
LockProcedure.LockType type = LockProcedure.LockType.valueOf(request.getLockType().name());
if (request.getRegionInfoCount() > 0) {
final HRegionInfo[] regionInfos = new HRegionInfo[request.getRegionInfoCount()];
for (int i = 0; i < request.getRegionInfoCount(); ++i) {
regionInfos[i] = HRegionInfo.convert(request.getRegionInfo(i));
}
procId = master.getLockManager().remoteLocks().requestRegionsLock(regionInfos,
request.getDescription(), request.getNonceGroup(), request.getNonce());
return LockResponse.newBuilder().setProcId(procId).build();
} else if (request.hasTableName()) {
final TableName tableName = ProtobufUtil.toTableName(request.getTableName());
procId = master.getLockManager().remoteLocks().requestTableLock(tableName, type,
request.getDescription(), request.getNonceGroup(), request.getNonce());
return LockResponse.newBuilder().setProcId(procId).build();
} else if (request.hasNamespace()) {
procId = master.getLockManager().remoteLocks().requestNamespaceLock(
request.getNamespace(), type, request.getDescription(),
request.getNonceGroup(), request.getNonce());
} else {
throw new IllegalArgumentException("one of table/namespace/region should be specified");
}
return LockResponse.newBuilder().setProcId(procId).build();
} catch (IllegalArgumentException e) {
LOG.warn("Exception when queuing lock", e);
throw new ServiceException(new DoNotRetryIOException(e));
} catch (IOException e) {
LOG.warn("Exception when queuing lock", e);
throw new ServiceException(e);
}
}
/**
* @return LOCKED, if procedure is found and it has the lock; else UNLOCKED.
* @throws ServiceException if given proc id is found but it is not a LockProcedure.
*/
@Override
public LockHeartbeatResponse lockHeartbeat(RpcController controller, LockHeartbeatRequest request)
throws ServiceException {
try {
if (master.getLockManager().remoteLocks().lockHeartbeat(request.getProcId(),
request.getKeepAlive())) {
return LockHeartbeatResponse.newBuilder().setTimeoutMs(
master.getConfiguration().getInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF,
LockProcedure.DEFAULT_REMOTE_LOCKS_TIMEOUT_MS))
.setLockStatus(LockHeartbeatResponse.LockStatus.LOCKED).build();
} else {
return LockHeartbeatResponse.newBuilder()
.setLockStatus(LockHeartbeatResponse.LockStatus.UNLOCKED).build();
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -488,4 +489,8 @@ public interface MasterServices extends Server {
*/
void removeDrainFromRegionServer(final ServerName server);
/**
* @return {@link LockManager} to lock namespaces/tables/regions.
*/
LockManager getLockManager();
}

View File

@ -0,0 +1,271 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.locking;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.NonceKey;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Functions to acquire lock on table/namespace/regions.
*/
@InterfaceAudience.Private
public final class LockManager {
private static final Log LOG = LogFactory.getLog(LockManager.class);
private final HMaster master;
private final RemoteLocks remoteLocks;
public LockManager(HMaster master) {
this.master = master;
this.remoteLocks = new RemoteLocks();
}
public RemoteLocks remoteLocks() {
return remoteLocks;
}
public MasterLock createMasterLock(final String namespace,
final LockProcedure.LockType type, final String description) {
return new MasterLock(namespace, type, description);
}
public MasterLock createMasterLock(final TableName tableName,
final LockProcedure.LockType type, final String description) {
return new MasterLock(tableName, type, description);
}
public MasterLock createMasterLock(final HRegionInfo[] regionInfos, final String description) {
return new MasterLock(regionInfos, description);
}
private void submitProcedure(final LockProcedure proc, final long nonceGroup, final long nonce) {
proc.setOwner(master.getMasterProcedureExecutor().getEnvironment().getRequestUser());
final NonceKey nonceKey = master.getMasterProcedureExecutor().createNonceKey(nonceGroup, nonce);
master.getMasterProcedureExecutor().submitProcedure(proc, nonceKey);
}
/**
* Locks on namespace/table/regions.
* Underneath, uses procedure framework and queues a {@link LockProcedure} which waits in a
* queue until scheduled.
* Use this lock instead LockManager.remoteLocks() for MASTER ONLY operations for two advantages:
* - no need of polling on LockProcedure to check if lock was acquired.
* - Generous timeout for lock preemption (default 10 min), no need to spawn thread for heartbeats.
* (timeout configuration {@link LockProcedure#DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS}).
*/
public class MasterLock {
private final String namespace;
private final TableName tableName;
private final HRegionInfo[] regionInfos;
private final LockProcedure.LockType type;
private final String description;
private LockProcedure proc = null;
public MasterLock(final String namespace,
final LockProcedure.LockType type, final String description) {
this.namespace = namespace;
this.tableName = null;
this.regionInfos = null;
this.type = type;
this.description = description;
}
public MasterLock(final TableName tableName,
final LockProcedure.LockType type, final String description) {
this.namespace = null;
this.tableName = tableName;
this.regionInfos = null;
this.type = type;
this.description = description;
}
public MasterLock(final HRegionInfo[] regionInfos, final String description) {
this.namespace = null;
this.tableName = null;
this.regionInfos = regionInfos;
this.type = LockProcedure.LockType.EXCLUSIVE;
this.description = description;
}
/**
* Acquire the lock, waiting indefinitely until the lock is released or
* the thread is interrupted.
* @throws InterruptedException If current thread is interrupted while
* waiting for the lock
*/
public boolean acquire() throws InterruptedException {
return tryAcquire(0);
}
/**
* Acquire the lock within a wait time.
* @param timeoutMs The maximum time (in milliseconds) to wait for the lock,
* 0 to wait indefinitely
* @return True if the lock was acquired, false if waiting time elapsed
* before the lock was acquired
* @throws InterruptedException If the thread is interrupted while waiting to
* acquire the lock
*/
public boolean tryAcquire(final long timeoutMs) throws InterruptedException {
if (proc != null && proc.isLocked()) {
return true;
}
// Use new condition and procedure every time lock is requested.
final CountDownLatch lockAcquireLatch = new CountDownLatch(1);
if (regionInfos != null) {
proc = new LockProcedure(master.getConfiguration(), regionInfos, type,
description, lockAcquireLatch);
} else if (tableName != null) {
proc = new LockProcedure(master.getConfiguration(), tableName, type,
description, lockAcquireLatch);
} else if (namespace != null) {
proc = new LockProcedure(master.getConfiguration(), namespace, type,
description, lockAcquireLatch);
} else {
throw new UnsupportedOperationException("no namespace/table/region provided");
}
// The user of a MasterLock should be 'hbase', the only case where this is not true
// is if from inside a coprocessor we try to take a master lock (which should be avoided)
proc.setOwner(master.getMasterProcedureExecutor().getEnvironment().getRequestUser());
master.getMasterProcedureExecutor().submitProcedure(proc);
long deadline = (timeoutMs > 0) ? System.currentTimeMillis() + timeoutMs : Long.MAX_VALUE;
while (deadline >= System.currentTimeMillis() && !proc.isLocked()) {
try {
lockAcquireLatch.await(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.info("InterruptedException when waiting for lock: " + proc.toString());
// kind of weird, releasing a lock which is not locked. This is to make the procedure
// finish immediately whenever it gets scheduled so that it doesn't hold the lock.
release();
throw e;
}
}
if (!proc.isLocked()) {
LOG.info("Timed out waiting to acquire procedure lock: " + proc.toString());
release();
return false;
}
return true;
}
/**
* Release the lock.
* No-op if the lock was never acquired.
*/
public void release() {
if (proc != null) {
proc.unlock(master.getMasterProcedureExecutor().getEnvironment());
}
proc = null;
}
@Override
public String toString() {
return "MasterLock: proc = " + proc.toString();
}
@VisibleForTesting
LockProcedure getProc() {
return proc;
}
}
/**
* Locks on namespace/table/regions for remote operations.
* Since remote operations are unreliable and the client/RS may die anytime and never release
* locks, regular heartbeats are required to keep the lock held.
*/
public class RemoteLocks {
public long requestNamespaceLock(final String namespace, final LockProcedure.LockType type,
final String description, final long nonceGroup, final long nonce)
throws IllegalArgumentException, IOException {
master.getMasterCoprocessorHost().preRequestLock(namespace, null, null, type, description);
final LockProcedure proc = new LockProcedure(master.getConfiguration(), namespace,
type, description, null);
submitProcedure(proc, nonceGroup, nonce);
master.getMasterCoprocessorHost().postRequestLock(namespace, null, null, type, description);
return proc.getProcId();
}
public long requestTableLock(final TableName tableName, final LockProcedure.LockType type,
final String description, final long nonceGroup, final long nonce)
throws IllegalArgumentException, IOException {
master.getMasterCoprocessorHost().preRequestLock(null, tableName, null, type, description);
final LockProcedure proc = new LockProcedure(master.getConfiguration(), tableName,
type, description, null);
submitProcedure(proc, nonceGroup, nonce);
master.getMasterCoprocessorHost().postRequestLock(null, tableName, null, type, description);
return proc.getProcId();
}
/**
* @throws IllegalArgumentException if all regions are not from same table.
*/
public long requestRegionsLock(final HRegionInfo[] regionInfos, final String description,
final long nonceGroup, final long nonce) throws IllegalArgumentException, IOException {
master.getMasterCoprocessorHost().preRequestLock(null, null, regionInfos,
LockProcedure.LockType.EXCLUSIVE, description);
final LockProcedure proc = new LockProcedure(master.getConfiguration(), regionInfos,
LockProcedure.LockType.EXCLUSIVE, description, null);
submitProcedure(proc, nonceGroup, nonce);
master.getMasterCoprocessorHost().postRequestLock(null, null, regionInfos,
LockProcedure.LockType.EXCLUSIVE, description);
return proc.getProcId();
}
/**
* @param keepAlive if false, release the lock.
* @return true, if procedure is found and it has the lock; else false.
*/
public boolean lockHeartbeat(final long procId, final boolean keepAlive) throws IOException {
final LockProcedure proc = master.getMasterProcedureExecutor()
.getProcedure(LockProcedure.class, procId);
if (proc == null) return false;
master.getMasterCoprocessorHost().preLockHeartbeat(proc, keepAlive);
proc.updateHeartBeat();
if (!keepAlive) {
proc.unlock(master.getMasterProcedureExecutor().getEnvironment());
}
master.getMasterCoprocessorHost().postLockHeartbeat(proc, keepAlive);
return proc.isLocked();
}
}
}

View File

@ -0,0 +1,462 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.locking;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockProcedureData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@InterfaceAudience.Private
public final class LockProcedure extends Procedure<MasterProcedureEnv>
implements TableProcedureInterface {
private static final Log LOG = LogFactory.getLog(LockProcedure.class);
public static final int DEFAULT_REMOTE_LOCKS_TIMEOUT_MS = 30000; // timeout in ms
public static final String REMOTE_LOCKS_TIMEOUT_MS_CONF =
"hbase.master.procedure.remote.locks.timeout.ms";
// 10 min. Same as old ZK lock timeout.
public static final int DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS = 600000;
public static final String LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF =
"hbase.master.procedure.local.master.locks.timeout.ms";
// Also used in serialized states, changes will affect backward compatibility.
public enum LockType { SHARED, EXCLUSIVE }
private String namespace;
private TableName tableName;
private HRegionInfo[] regionInfos;
private LockType type;
// underlying namespace/table/region lock.
private LockInterface lock;
private TableOperationType opType;
private String description;
// True when recovery of master lock from WALs
private boolean recoveredMasterLock;
// this is for internal working
private boolean hasLock;
private final ProcedureEvent<LockProcedure> event = new ProcedureEvent<LockProcedure>(this);
// True if this proc acquired relevant locks. This value is for client checks.
private final AtomicBoolean locked = new AtomicBoolean(false);
// Last system time (in ms) when client sent the heartbeat.
// Initialize to system time for non-null value in case of recovery.
private final AtomicLong lastHeartBeat = new AtomicLong();
// Set to true when unlock request is received.
private final AtomicBoolean unlock = new AtomicBoolean(false);
// decreased when locks are acquired. Only used for local (with master process) purposes.
// Setting latch to non-null value increases default timeout to
// DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS (10 min) so that there is no need to heartbeat.
private final CountDownLatch lockAcquireLatch;
@Override
public TableName getTableName() {
return tableName;
}
@Override
public TableOperationType getTableOperationType() {
return opType;
}
private interface LockInterface {
boolean acquireLock(MasterProcedureEnv env);
void releaseLock(MasterProcedureEnv env);
}
public LockProcedure() {
lockAcquireLatch = null;
}
private LockProcedure(final Configuration conf, final LockType type,
final String description, final CountDownLatch lockAcquireLatch) {
this.type = type;
this.description = description;
this.lockAcquireLatch = lockAcquireLatch;
if (lockAcquireLatch == null) {
setTimeout(conf.getInt(REMOTE_LOCKS_TIMEOUT_MS_CONF, DEFAULT_REMOTE_LOCKS_TIMEOUT_MS));
} else {
setTimeout(conf.getInt(LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF,
DEFAULT_LOCAL_MASTER_LOCKS_TIMEOUT_MS));
}
}
/**
* Constructor for namespace lock.
* @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
*/
public LockProcedure(final Configuration conf, final String namespace, final LockType type,
final String description, final CountDownLatch lockAcquireLatch)
throws IllegalArgumentException {
this(conf, type, description, lockAcquireLatch);
if (namespace.isEmpty()) {
throw new IllegalArgumentException("Empty namespace");
}
this.namespace = namespace;
this.lock = setupNamespaceLock();
}
/**
* Constructor for table lock.
* @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
*/
public LockProcedure(final Configuration conf, final TableName tableName, final LockType type,
final String description, final CountDownLatch lockAcquireLatch)
throws IllegalArgumentException {
this(conf, type, description, lockAcquireLatch);
this.tableName = tableName;
this.lock = setupTableLock();
}
/**
* Constructor for region lock(s).
* @param lockAcquireLatch if not null, the latch is decreased when lock is acquired.
* Useful for locks acquired locally from master process.
* @throws IllegalArgumentException if all regions are not from same table.
*/
public LockProcedure(final Configuration conf, final HRegionInfo[] regionInfos,
final LockType type, final String description, final CountDownLatch lockAcquireLatch)
throws IllegalArgumentException {
this(conf, type, description, lockAcquireLatch);
// Build HRegionInfo from region names.
if (regionInfos.length == 0) {
throw new IllegalArgumentException("No regions specified for region lock");
}
// check all regions belong to same table.
final TableName regionTable = regionInfos[0].getTable();
for (int i = 1; i < regionInfos.length; ++i) {
if (!regionInfos[i].getTable().equals(regionTable)) {
throw new IllegalArgumentException("All regions should be from same table");
}
}
this.regionInfos = regionInfos;
this.lock = setupRegionLock();
}
private boolean hasHeartbeatExpired() {
return System.currentTimeMillis() - lastHeartBeat.get() >= getTimeout();
}
/**
* Updates timeout deadline for the lock.
*/
public void updateHeartBeat() {
lastHeartBeat.set(System.currentTimeMillis());
if (LOG.isDebugEnabled()) {
LOG.debug("Update heartbeat. Proc: " + toString());
}
}
/**
* Re run the procedure after every timeout to write new WAL entries so we don't hold back old
* WALs.
* @return false, so procedure framework doesn't mark this procedure as failure.
*/
protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
synchronized (event) {
if (!event.isReady()) { // maybe unlock() awakened the event.
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().wakeEvent(event);
}
}
return false; // false: do not mark the procedure as failed.
}
// Can be called before procedure gets scheduled, in which case, the execute() will finish
// immediately and release the underlying locks.
public void unlock(final MasterProcedureEnv env) {
unlock.set(true);
locked.set(false);
// Maybe timeout already awakened the event and the procedure has finished.
synchronized (event) {
if (!event.isReady()) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().wakeEvent(event);
}
}
}
@Override
protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException {
// Local master locks don't store any state, so on recovery, simply finish this procedure
// immediately.
if (recoveredMasterLock) return null;
if (lockAcquireLatch != null) {
lockAcquireLatch.countDown();
}
if (unlock.get() || hasHeartbeatExpired()) {
locked.set(false);
LOG.debug((unlock.get() ? "UNLOCKED - " : "TIMED OUT - ") + toString());
return null;
}
synchronized (event) {
env.getProcedureScheduler().suspendEvent(event);
env.getProcedureScheduler().waitEvent(event, this);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
}
throw new ProcedureSuspendedException();
}
@Override
protected void rollback(final MasterProcedureEnv env) {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(final MasterProcedureEnv env) {
unlock(env);
return true;
}
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {
final LockProcedureData.Builder builder = LockProcedureData.newBuilder()
.setLockType(LockServiceProtos.LockType.valueOf(type.name()))
.setDescription(description);
if (regionInfos != null) {
for (int i = 0; i < regionInfos.length; ++i) {
builder.addRegionInfo(HRegionInfo.convert(regionInfos[i]));
}
} else if (namespace != null) {
builder.setNamespace(namespace);
} else if (tableName != null) {
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
}
if (lockAcquireLatch != null) {
builder.setIsMasterLock(true);
}
builder.build().writeDelimitedTo(stream);
}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException {
final LockProcedureData state = LockProcedureData.parseDelimitedFrom(stream);
type = LockType.valueOf(state.getLockType().name());
description = state.getDescription();
if (state.getRegionInfoCount() > 0) {
regionInfos = new HRegionInfo[state.getRegionInfoCount()];
for (int i = 0; i < state.getRegionInfoCount(); ++i) {
regionInfos[i] = HRegionInfo.convert(state.getRegionInfo(i));
}
} else if (state.hasNamespace()) {
namespace = state.getNamespace();
} else if (state.hasTableName()) {
tableName = ProtobufUtil.toTableName(state.getTableName());
}
recoveredMasterLock = state.getIsMasterLock();
this.lock = setupLock();
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
boolean ret = lock.acquireLock(env);
locked.set(ret);
hasLock = ret;
if (ret) {
if (LOG.isDebugEnabled()) {
LOG.debug("LOCKED - " + toString());
}
lastHeartBeat.set(System.currentTimeMillis());
}
return ret;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
lock.releaseLock(env);
hasLock = false;
}
/**
* On recovery, re-execute from start to acquire the locks.
* Need to explicitly set it to RUNNABLE because the procedure might have been in WAITING_TIMEOUT
* state when crash happened. In which case, it'll be sent back to timeout queue on recovery,
* which we don't want since we want to require locks.
*/
@Override
protected void beforeReplay(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
}
protected void toStringClassDetails(final StringBuilder builder) {
super.toStringClassDetails(builder);
if (regionInfos != null) {
builder.append(" regions=");
for (int i = 0; i < regionInfos.length; ++i) {
if (i > 0) builder.append(",");
builder.append(regionInfos[i].getShortNameToLog());
}
} else if (namespace != null) {
builder.append(", namespace=").append(namespace);
} else if (tableName != null) {
builder.append(", tableName=").append(tableName);
}
builder.append(", type=").append(type);
}
private LockInterface setupLock() throws IllegalArgumentException {
if (regionInfos != null) {
return setupRegionLock();
} else if (namespace != null) {
return setupNamespaceLock();
} else if (tableName != null) {
return setupTableLock();
} else {
LOG.error("Unknown level specified in proc - " + toString());
throw new IllegalArgumentException("no namespace/table/region provided");
}
}
private LockInterface setupNamespaceLock() throws IllegalArgumentException {
this.tableName = TableName.NAMESPACE_TABLE_NAME;
switch (type) {
case EXCLUSIVE:
this.opType = TableOperationType.EDIT;
return new NamespaceExclusiveLock();
case SHARED:
LOG.error("Shared lock on namespace not supported. Proc - " + toString());
throw new IllegalArgumentException("Shared lock on namespace not supported");
default:
LOG.error("Unexpected lock type in proc - " + toString());
throw new IllegalArgumentException("Wrong lock type: " + type.toString());
}
}
private LockInterface setupTableLock() throws IllegalArgumentException {
switch (type) {
case EXCLUSIVE:
this.opType = TableOperationType.EDIT;
return new TableExclusiveLock();
case SHARED:
this.opType = TableOperationType.READ;
return new TableSharedLock();
default:
LOG.error("Unexpected lock type in proc - " + toString());
throw new IllegalArgumentException("Wrong lock type:" + type.toString());
}
}
private LockInterface setupRegionLock() throws IllegalArgumentException {
this.tableName = regionInfos[0].getTable();
switch (type) {
case EXCLUSIVE:
this.opType = TableOperationType.REGION_EDIT;
return new RegionExclusiveLock();
default:
LOG.error("Only exclusive lock supported on regions. Proc - " + toString());
throw new IllegalArgumentException("Only exclusive lock supported on regions.");
}
}
public String getDescription() {
return description;
}
public boolean isLocked() {
return locked.get();
}
@Override
public boolean holdLock(final MasterProcedureEnv env) {
return true;
}
@Override
public boolean hasLock(final MasterProcedureEnv env) {
return hasLock;
}
///////////////////////
// LOCK IMPLEMENTATIONS
///////////////////////
private class TableExclusiveLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureScheduler().tryAcquireTableExclusiveLock(LockProcedure.this, tableName);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().releaseTableExclusiveLock(LockProcedure.this, tableName);
}
}
private class TableSharedLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureScheduler().tryAcquireTableSharedLock(LockProcedure.this, tableName);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().releaseTableSharedLock(LockProcedure.this, tableName);
}
}
private class NamespaceExclusiveLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureScheduler().tryAcquireNamespaceExclusiveLock(
LockProcedure.this, namespace);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().releaseNamespaceExclusiveLock(
LockProcedure.this, namespace);
}
}
private class RegionExclusiveLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos);
}
}
}

View File

@ -46,7 +46,7 @@ public class IdLock {
public static class Entry {
private final long id;
private int numWaiters;
private boolean isLocked = true;
private boolean locked = true;
private Entry(long id) {
this.id = id;
@ -54,7 +54,7 @@ public class IdLock {
public String toString() {
return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
+ isLocked;
+ locked;
}
}
@ -74,9 +74,9 @@ public class IdLock {
Entry existing;
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
synchronized (existing) {
if (existing.isLocked) {
if (existing.locked) {
++existing.numWaiters; // Add ourselves to waiters.
while (existing.isLocked) {
while (existing.locked) {
try {
existing.wait();
} catch (InterruptedException e) {
@ -87,7 +87,7 @@ public class IdLock {
}
--existing.numWaiters; // Remove ourselves from waiters.
existing.isLocked = true;
existing.locked = true;
return existing;
}
// If the entry is not locked, it might already be deleted from the
@ -107,7 +107,7 @@ public class IdLock {
*/
public void releaseLockEntry(Entry entry) {
synchronized (entry) {
entry.isLocked = false;
entry.locked = false;
if (entry.numWaiters > 0) {
entry.notify();
} else {

View File

@ -0,0 +1,182 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.locking;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.*;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mortbay.log.Log;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Category({ClientTests.class, SmallTests.class})
public class TestEntityLocks {
private final Configuration conf = HBaseConfiguration.create();
private final LockService.BlockingInterface master =
Mockito.mock(LockService.BlockingInterface.class);
private LockServiceClient admin;
private ArgumentCaptor<LockRequest> lockReqArgCaptor;
private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor;
private static final LockHeartbeatResponse UNLOCKED_RESPONSE =
LockHeartbeatResponse.newBuilder().setLockStatus(
LockHeartbeatResponse.LockStatus.UNLOCKED).build();
// timeout such that worker thread waits for 500ms for each heartbeat.
private static final LockHeartbeatResponse LOCKED_RESPONSE =
LockHeartbeatResponse.newBuilder().setLockStatus(
LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build();
private long procId;
// Setup mock admin.
LockServiceClient getAdmin() throws Exception {
conf.setInt("hbase.client.retries.number", 3);
conf.setInt("hbase.client.pause", 1); // 1ms. Immediately retry rpc on failure.
return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get());
}
@Before
public void setUp() throws Exception {
admin = getAdmin();
lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class);
lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class);
procId = new Random().nextLong();
}
private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) {
long startMillis = System.currentTimeMillis();
while (lock.isLocked()) {
Log.info("Sleeping...");
Threads.sleepWithoutInterrupt(100);
if (!lock.isLocked()) {
return true;
}
if (System.currentTimeMillis() - startMillis > maxWaitTimeMillis) {
Log.info("Timedout...");
return false;
}
}
return true; // to make compiler happy.
}
/**
* Test basic lock function - requestLock, await, unlock.
* @throws Exception
*/
@Test
public void testEntityLock() throws Exception {
final long procId = 100;
final long workerSleepTime = 200; // in ms
EntityLock lock = admin.namespaceLock("namespace", "description", null);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any())).thenReturn(
LockResponse.newBuilder().setProcId(procId).build());
when(master.lockHeartbeat(any(), any())).thenReturn(
UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, LOCKED_RESPONSE);
lock.requestLock();
// we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime
lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS);
assertTrue(lock.isLocked());
lock.unlock();
assertTrue(!lock.getWorker().isAlive());
assertFalse(lock.isLocked());
// check LockRequest in requestLock()
verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture());
LockRequest request = lockReqArgCaptor.getValue();
assertEquals("namespace", request.getNamespace());
assertEquals("description", request.getDescription());
assertEquals(LockType.EXCLUSIVE, request.getLockType());
assertEquals(0, request.getRegionInfoCount());
// check LockHeartbeatRequest in lockHeartbeat()
verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture());
for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) {
assertEquals(procId, req.getProcId());
}
}
/**
* Test that abort is called when lock times out.
*/
@Test
public void testEntityLockTimeout() throws Exception {
final long workerSleepTime = 200; // in ms
Abortable abortable = Mockito.mock(Abortable.class);
EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any()))
.thenReturn(LockResponse.newBuilder().setProcId(procId).build());
// Acquires the lock, but then it times out (since we don't call unlock() on it).
when(master.lockHeartbeat(any(), any()))
.thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE);
lock.requestLock();
lock.await();
assertTrue(lock.isLocked());
// Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 2x time.
assertTrue(waitLockTimeOut(lock, 2 * workerSleepTime));
assertFalse(lock.getWorker().isAlive());
verify(abortable, times(1)).abort(any(), eq(null));
}
/**
* Test that abort is called when lockHeartbeat fails with IOException.
*/
@Test
public void testHeartbeatException() throws Exception {
final long workerSleepTime = 100; // in ms
Abortable abortable = Mockito.mock(Abortable.class);
EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any()))
.thenReturn(LockResponse.newBuilder().setProcId(procId).build());
when(master.lockHeartbeat(any(), any()))
.thenReturn(LOCKED_RESPONSE)
.thenThrow(new ServiceException("Failed heartbeat!"));
lock.requestLock();
lock.await();
assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime));
verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class));
assertFalse(lock.getWorker().isAlive());
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -55,15 +56,17 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -177,6 +180,10 @@ public class TestMasterObserver {
private boolean postDispatchMergeCalled;
private boolean preMergeRegionsCalled;
private boolean postMergeRegionsCalled;
private boolean preRequestLockCalled;
private boolean postRequestLockCalled;
private boolean preLockHeartbeatCalled;
private boolean postLockHeartbeatCalled;
public void enableBypass(boolean bypass) {
this.bypass = bypass;
@ -265,6 +272,10 @@ public class TestMasterObserver {
postDispatchMergeCalled = false;
preMergeRegionsCalled = false;
postMergeRegionsCalled = false;
preRequestLockCalled = false;
postRequestLockCalled = false;
preLockHeartbeatCalled = false;
postLockHeartbeatCalled = false;
}
@Override
@ -1497,7 +1508,38 @@ public class TestMasterObserver {
@Override
public void postBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
String groupName, boolean balancerRan) throws IOException {
String groupName, boolean balancerRan) throws IOException {
}
@Override
public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException {
preRequestLockCalled = true;
}
@Override
public void postRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
TableName tableName, HRegionInfo[] regionInfos, LockProcedure.LockType type,
String description) throws IOException {
postRequestLockCalled = true;
}
@Override
public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
preLockHeartbeatCalled = true;
}
@Override
public void postLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
LockProcedure proc, boolean keepAlive) throws IOException {
postLockHeartbeatCalled = true;
}
public boolean preAndPostForQueueLockAndHeartbeatLockCalled() {
return preRequestLockCalled && postRequestLockCalled && preLockHeartbeatCalled &&
postLockHeartbeatCalled;
}
@Override
@ -2121,4 +2163,22 @@ public class TestMasterObserver {
tableDeletionLatch.await();
tableDeletionLatch = new CountDownLatch(1);
}
@Test
public void testQueueLockAndLockHeartbeatOperations() throws Exception {
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
CPMasterObserver cp = (CPMasterObserver)master.getMasterCoprocessorHost().findCoprocessor(
CPMasterObserver.class.getName());
cp.resetStates();
final TableName tableName = TableName.valueOf("testLockedTable");
long procId = master.getLockManager().remoteLocks().requestTableLock(tableName,
LockProcedure.LockType.EXCLUSIVE, "desc", HConstants.NO_NONCE, HConstants.NO_NONCE);
master.getLockManager().remoteLocks().lockHeartbeat(procId, false);
assertTrue(cp.preAndPostForQueueLockAndHeartbeatLockCalled());
ProcedureTestingUtility.waitNoProcedureRunning(master.getMasterProcedureExecutor());
ProcedureTestingUtility.assertProcNotFailed(master.getMasterProcedureExecutor(), procId);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -379,7 +380,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
return null;
}
@Override
@Override
public MasterProcedureManagerHost getMasterProcedureManagerHost() {
return null;
}
@ -432,4 +433,9 @@ public class MockNoopMasterServices implements MasterServices, Server {
throws ReplicationException, IOException {
return null;
}
@Override
public LockManager getLockManager() {
return null;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.locking;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.locking.TestLockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, SmallTests.class})
public class TestLockManager {
@Rule
public TestName testName = new TestName();
// crank this up if this test turns out to be flaky.
private static final int LOCAL_LOCKS_TIMEOUT = 1000;
private static final Log LOG = LogFactory.getLog(TestLockProcedure.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final Configuration conf = UTIL.getConfiguration();
private static MasterServices masterServices;
private static String namespace = "namespace";
private static TableName tableName = TableName.valueOf(namespace, "table");
private static HRegionInfo[] tableRegions;
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null
conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
}
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(1);
masterServices = UTIL.getMiniHBaseCluster().getMaster();
UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
UTIL.createTable(tableName, new byte[][]{"fam".getBytes()}, new byte[][] {"1".getBytes()});
List<HRegionInfo> regions = UTIL.getAdmin().getTableRegions(tableName);
assert regions.size() > 0;
tableRegions = new HRegionInfo[regions.size()];
regions.toArray(tableRegions);
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@After
public void tearDown() throws Exception {
for (ProcedureInfo procInfo : getMasterProcedureExecutor().listProcedures()) {
Procedure proc = getMasterProcedureExecutor().getProcedure(procInfo.getProcId());
if (proc instanceof LockProcedure) {
((LockProcedure) proc).unlock(getMasterProcedureExecutor().getEnvironment());
ProcedureTestingUtility.waitProcedure(getMasterProcedureExecutor(), proc);
}
}
assertEquals(0, getMasterProcedureExecutor().getEnvironment().getProcedureScheduler().size());
}
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}
/**
* Tests that basic lock functionality works.
*/
@Test
public void testMasterLockAcquire() throws Exception {
LockManager.MasterLock lock = masterServices.getLockManager().createMasterLock(namespace,
LockProcedure.LockType.EXCLUSIVE, "desc");
assertTrue(lock.tryAcquire(2000));
assertTrue(lock.getProc().isLocked());
lock.release();
assertEquals(null, lock.getProc());
}
/**
* Two locks try to acquire lock on same table, assert that later one times out.
*/
@Test
public void testMasterLockAcquireTimeout() throws Exception {
LockManager.MasterLock lock = masterServices.getLockManager().createMasterLock(
tableName, LockProcedure.LockType.EXCLUSIVE, "desc");
LockManager.MasterLock lock2 = masterServices.getLockManager().createMasterLock(
tableName, LockProcedure.LockType.EXCLUSIVE, "desc");
assertTrue(lock.tryAcquire(2000));
assertFalse(lock2.tryAcquire(LOCAL_LOCKS_TIMEOUT/2)); // wait less than other lock's timeout
assertEquals(null, lock2.getProc());
lock.release();
assertTrue(lock2.tryAcquire(2000));
assertTrue(lock2.getProc().isLocked());
lock2.release();
}
/**
* Take region lock, they try table exclusive lock, later one should time out.
*/
@Test
public void testMasterLockAcquireTimeoutRegionVsTableExclusive() throws Exception {
LockManager.MasterLock lock = masterServices.getLockManager().createMasterLock(
tableRegions, "desc");
LockManager.MasterLock lock2 = masterServices.getLockManager().createMasterLock(
tableName, LockProcedure.LockType.EXCLUSIVE, "desc");
assertTrue(lock.tryAcquire(2000));
assertFalse(lock2.tryAcquire(LOCAL_LOCKS_TIMEOUT/2)); // wait less than other lock's timeout
assertEquals(null, lock2.getProc());
lock.release();
assertTrue(lock2.tryAcquire(2000));
assertTrue(lock2.getProc().isLocked());
lock2.release();
}
}

View File

@ -0,0 +1,456 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.locking;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.*;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.StringStartsWith;
import org.junit.rules.TestRule;
import org.junit.experimental.categories.Category;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, SmallTests.class})
public class TestLockProcedure {
@Rule
public final TestRule timeout = CategoryBasedTimeout.builder().
withTimeout(this.getClass()).withLookingForStuckThread(true).build();
@Rule
public final ExpectedException exception = ExpectedException.none();
@Rule
public TestName testName = new TestName();
// crank this up if this test turns out to be flaky.
private static final int HEARTBEAT_TIMEOUT = 1000;
private static final int LOCAL_LOCKS_TIMEOUT = 2000;
private static final int ZK_EXPIRATION = 2 * HEARTBEAT_TIMEOUT;
private static final Log LOG = LogFactory.getLog(TestLockProcedure.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static MasterRpcServices masterRpcService;
private static ProcedureExecutor<MasterProcedureEnv> procExec;
private static String namespace = "namespace";
private static TableName tableName1 = TableName.valueOf(namespace, "table1");
private static List<HRegionInfo> tableRegions1;
private static TableName tableName2 = TableName.valueOf(namespace, "table2");
private static List<HRegionInfo> tableRegions2;
private String testMethodName;
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
conf.setBoolean("hbase.procedure.check.owner.set", false); // since rpc user will be null
conf.setInt(LockProcedure.REMOTE_LOCKS_TIMEOUT_MS_CONF, HEARTBEAT_TIMEOUT);
conf.setInt(LockProcedure.LOCAL_MASTER_LOCKS_TIMEOUT_MS_CONF, LOCAL_LOCKS_TIMEOUT);
conf.setInt(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, ZK_EXPIRATION);
}
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(1);
UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
UTIL.createTable(tableName1, new byte[][]{"fam".getBytes()}, new byte[][] {"1".getBytes()});
UTIL.createTable(tableName2, new byte[][]{"fam".getBytes()}, new byte[][] {"1".getBytes()});
masterRpcService = UTIL.getHBaseCluster().getMaster().getMasterRpcServices();
procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
tableRegions1 = UTIL.getAdmin().getTableRegions(tableName1);
tableRegions2 = UTIL.getAdmin().getTableRegions(tableName2);
assert tableRegions1.size() > 0;
assert tableRegions2.size() > 0;
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Before
public void setup() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
testMethodName = testName.getMethodName();
}
@After
public void tearDown() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
// Kill all running procedures.
for (ProcedureInfo procInfo : procExec.listProcedures()) {
Procedure proc = procExec.getProcedure(procInfo.getProcId());
if (proc == null) continue;
procExec.abort(procInfo.getProcId());
ProcedureTestingUtility.waitProcedure(procExec, proc);
}
assertEquals(0, procExec.getEnvironment().getProcedureScheduler().size());
}
private LockRequest getNamespaceLock(String namespace, String description) {
return LockServiceClient.buildLockRequest(LockType.EXCLUSIVE,
namespace, null, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
private LockRequest getTableExclusiveLock(TableName tableName, String description) {
return LockServiceClient.buildLockRequest(LockType.EXCLUSIVE,
null, tableName, null, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
private LockRequest getRegionLock(List<HRegionInfo> regionInfos, String description) {
return LockServiceClient.buildLockRequest(LockType.EXCLUSIVE,
null, null, regionInfos, description, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
private void validateLockRequestException(LockRequest lockRequest, String message)
throws Exception {
exception.expect(ServiceException.class);
exception.expectCause(IsInstanceOf.instanceOf(DoNotRetryIOException.class));
exception.expectMessage(
StringStartsWith.startsWith("org.apache.hadoop.hbase.DoNotRetryIOException: "
+ "java.lang.IllegalArgumentException: " + message));
masterRpcService.requestLock(null, lockRequest);
}
@Test
public void testLockRequestValidationEmptyDescription() throws Exception {
validateLockRequestException(getNamespaceLock("", ""), "Empty description");
}
@Test
public void testLockRequestValidationEmptyNamespaceName() throws Exception {
validateLockRequestException(getNamespaceLock("", "desc"), "Empty namespace");
}
@Test
public void testLockRequestValidationRegionsFromDifferentTable() throws Exception {
List<HRegionInfo> regions = new ArrayList<>();
regions.addAll(tableRegions1);
regions.addAll(tableRegions2);
validateLockRequestException(getRegionLock(regions, "desc"),
"All regions should be from same table");
}
/**
* Returns immediately if the lock is acquired.
* @throws TimeoutException if lock couldn't be acquired.
*/
private boolean awaitForLocked(long procId, long timeoutInMs) throws Exception {
long deadline = System.currentTimeMillis() + timeoutInMs;
while (System.currentTimeMillis() < deadline) {
LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
LockHeartbeatRequest.newBuilder().setProcId(procId).build());
if (response.getLockStatus() == LockHeartbeatResponse.LockStatus.LOCKED) {
assertEquals(response.getTimeoutMs(), HEARTBEAT_TIMEOUT);
LOG.debug(String.format("Proc id %s acquired lock.", procId));
return true;
}
Thread.sleep(100);
}
return false;
}
private long queueLock(LockRequest lockRequest) throws ServiceException {
LockResponse response = masterRpcService.requestLock(null, lockRequest);
return response.getProcId();
}
private void sendHeartbeatAndCheckLocked(long procId, boolean isLocked) throws ServiceException {
LockHeartbeatResponse response = masterRpcService.lockHeartbeat(null,
LockHeartbeatRequest.newBuilder().setProcId(procId).build());
if (isLocked) {
assertEquals(LockHeartbeatResponse.LockStatus.LOCKED, response.getLockStatus());
} else {
assertEquals(LockHeartbeatResponse.LockStatus.UNLOCKED, response.getLockStatus());
}
LOG.debug(String.format("Proc id %s : %s.", procId, response.getLockStatus()));
}
private void releaseLock(long procId) throws ServiceException {
masterRpcService.lockHeartbeat(null,
LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build());
}
@Test
public void testUpdateHeartbeatAndUnlockForTable() throws Exception {
LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
final long procId = queueLock(lock);
assertTrue(awaitForLocked(procId, 2000));
Thread.sleep(HEARTBEAT_TIMEOUT /2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(HEARTBEAT_TIMEOUT /2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(HEARTBEAT_TIMEOUT /2);
sendHeartbeatAndCheckLocked(procId, true);
releaseLock(procId);
sendHeartbeatAndCheckLocked(procId, false);
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
@Test
public void testAbort() throws Exception {
LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
final long procId = queueLock(lock);
assertTrue(awaitForLocked(procId, 2000));
assertTrue(procExec.abort(procId));
sendHeartbeatAndCheckLocked(procId, false);
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
@Test
public void testUpdateHeartbeatAndUnlockForNamespace() throws Exception {
LockRequest lock = getNamespaceLock(namespace, testMethodName);
final long procId = queueLock(lock);
assertTrue(awaitForLocked(procId, 2000));
Thread.sleep(HEARTBEAT_TIMEOUT /2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(HEARTBEAT_TIMEOUT /2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(HEARTBEAT_TIMEOUT /2);
sendHeartbeatAndCheckLocked(procId, true);
releaseLock(procId);
sendHeartbeatAndCheckLocked(procId, false);
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
@Test
public void testTimeout() throws Exception {
LockRequest lock = getNamespaceLock(namespace, testMethodName);
final long procId = queueLock(lock);
assertTrue(awaitForLocked(procId, 2000));
Thread.sleep(HEARTBEAT_TIMEOUT / 2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(HEARTBEAT_TIMEOUT / 2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(2 * HEARTBEAT_TIMEOUT);
sendHeartbeatAndCheckLocked(procId, false);
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
@Test
public void testMultipleLocks() throws Exception {
LockRequest nsLock = getNamespaceLock(namespace, testMethodName);
LockRequest tableLock1 = getTableExclusiveLock(tableName1, testMethodName);
LockRequest tableLock2 = getTableExclusiveLock(tableName2, testMethodName);
LockRequest regionsLock1 = getRegionLock(tableRegions1, testMethodName);
LockRequest regionsLock2 = getRegionLock(tableRegions2, testMethodName);
// Acquire namespace lock, then queue other locks.
long nsProcId = queueLock(nsLock);
assertTrue(awaitForLocked(nsProcId, 2000));
sendHeartbeatAndCheckLocked(nsProcId, true);
long table1ProcId = queueLock(tableLock1);
long table2ProcId = queueLock(tableLock2);
long regions1ProcId = queueLock(regionsLock1);
long regions2ProcId = queueLock(regionsLock2);
// Assert tables & region locks are waiting because of namespace lock.
Thread.sleep(HEARTBEAT_TIMEOUT / 2);
sendHeartbeatAndCheckLocked(nsProcId, true);
sendHeartbeatAndCheckLocked(table1ProcId, false);
sendHeartbeatAndCheckLocked(table2ProcId, false);
sendHeartbeatAndCheckLocked(regions1ProcId, false);
sendHeartbeatAndCheckLocked(regions2ProcId, false);
// Release namespace lock and assert tables locks are acquired but not region lock
releaseLock(nsProcId);
assertTrue(awaitForLocked(table1ProcId, 2000));
assertTrue(awaitForLocked(table2ProcId, 2000));
sendHeartbeatAndCheckLocked(regions1ProcId, false);
sendHeartbeatAndCheckLocked(regions2ProcId, false);
// Release table1 lock and assert region lock is acquired.
releaseLock(table1ProcId);
sendHeartbeatAndCheckLocked(table1ProcId, false);
assertTrue(awaitForLocked(regions1ProcId, 2000));
sendHeartbeatAndCheckLocked(table2ProcId, true);
sendHeartbeatAndCheckLocked(regions2ProcId, false);
// Release table2 lock and assert region lock is acquired.
releaseLock(table2ProcId);
sendHeartbeatAndCheckLocked(table2ProcId, false);
assertTrue(awaitForLocked(regions2ProcId, 2000));
sendHeartbeatAndCheckLocked(regions1ProcId, true);
sendHeartbeatAndCheckLocked(regions2ProcId, true);
// Release region locks.
releaseLock(regions1ProcId);
releaseLock(regions2ProcId);
sendHeartbeatAndCheckLocked(regions1ProcId, false);
sendHeartbeatAndCheckLocked(regions2ProcId, false);
ProcedureTestingUtility.waitAllProcedures(procExec);
ProcedureTestingUtility.assertProcNotFailed(procExec, nsProcId);
ProcedureTestingUtility.assertProcNotFailed(procExec, table1ProcId);
ProcedureTestingUtility.assertProcNotFailed(procExec, table2ProcId);
ProcedureTestingUtility.assertProcNotFailed(procExec, regions1ProcId);
ProcedureTestingUtility.assertProcNotFailed(procExec, regions2ProcId);
}
// Test latch is decreased in count when lock is acquired.
@Test
public void testLatch() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
TableName.valueOf("table"), LockProcedure.LockType.EXCLUSIVE, "desc", latch);
procExec.submitProcedure(lockProc);
assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
releaseLock(lockProc.getProcId());
ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
}
// LockProcedures with latch are considered local locks.
@Test
public void testLocalLockTimeout() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// MasterRpcServices don't set latch with LockProcedure, so create one and submit it directly.
LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
TableName.valueOf("table"), LockProcedure.LockType.EXCLUSIVE, "desc", latch);
procExec.submitProcedure(lockProc);
assertTrue(awaitForLocked(lockProc.getProcId(), 2000));
Thread.sleep(LOCAL_LOCKS_TIMEOUT / 2);
assertTrue(lockProc.isLocked());
Thread.sleep(2 * LOCAL_LOCKS_TIMEOUT);
assertFalse(lockProc.isLocked());
releaseLock(lockProc.getProcId());
ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
}
private void testRemoteLockRecovery(LockRequest lock) throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
final long procId = queueLock(lock);
assertTrue(awaitForLocked(procId, 2000));
// wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
ProcedureTestingUtility.waitProcedure(procExec, procId);
assertEquals(false, procExec.isRunning());
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
// Remove zk lock node otherwise recovered lock will keep waiting on it. Remove
// both exclusive and non-exclusive (the table shared lock that the region takes).
// Have to pause to let the locks 'expire' up in zk. See above configs where we
// set explict zk timeout on locks.
Thread.sleep(ZK_EXPIRATION + HEARTBEAT_TIMEOUT);
UTIL.getMiniHBaseCluster().getMaster().getTableLockManager().reapAllExpiredLocks();
ProcedureTestingUtility.restart(procExec);
while (!procExec.isStarted(procId)) {
Thread.sleep(250);
}
assertEquals(true, procExec.isRunning());
// After recovery, remote locks should reacquire locks and function normally.
assertTrue(awaitForLocked(procId, 2000));
Thread.sleep(HEARTBEAT_TIMEOUT/2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(HEARTBEAT_TIMEOUT/2);
sendHeartbeatAndCheckLocked(procId, true);
Thread.sleep(2 * HEARTBEAT_TIMEOUT);
sendHeartbeatAndCheckLocked(procId, false);
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
}
@Test(timeout = 20000)
public void testRemoteTableLockRecovery() throws Exception {
LockRequest lock = getTableExclusiveLock(tableName1, testMethodName);
testRemoteLockRecovery(lock);
}
@Test(timeout = 20000)
public void testRemoteNamespaceLockRecovery() throws Exception {
LockRequest lock = getNamespaceLock(namespace, testMethodName);
testRemoteLockRecovery(lock);
}
@Test(timeout = 20000)
public void testRemoteRegionLockRecovery() throws Exception {
LockRequest lock = getRegionLock(tableRegions1, testMethodName);
testRemoteLockRecovery(lock);
}
@Test (timeout = 20000)
public void testLocalMasterLockRecovery() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
CountDownLatch latch = new CountDownLatch(1);
LockProcedure lockProc = new LockProcedure(UTIL.getConfiguration(),
TableName.valueOf("table"), LockProcedure.LockType.EXCLUSIVE, "desc", latch);
procExec.submitProcedure(lockProc);
assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
// wait for proc Executor to die, then restart it and wait for Lock Procedure to get started.
ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
assertEquals(false, procExec.isRunning());
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
// remove zk lock node otherwise recovered lock will keep waiting on it.
UTIL.getMiniHBaseCluster().getMaster().getTableLockManager().reapWriteLocks();
ProcedureTestingUtility.restart(procExec);
while (!procExec.isStarted(lockProc.getProcId())) {
Thread.sleep(250);
}
assertEquals(true, procExec.isRunning());
LockProcedure proc = (LockProcedure) procExec.getProcedure(lockProc.getProcId());
assertTrue(proc == null || !proc.isLocked());
ProcedureTestingUtility.waitProcedure(procExec, lockProc.getProcId());
ProcedureTestingUtility.assertProcNotFailed(procExec, lockProc.getProcId());
}
}