HBASE-16831 Procedure V2 - Remove org.apache.hadoop.hbase.zookeeper.lock
(Appy)
This commit is contained in:
parent
558a6bb9d7
commit
4fdd6ff9ae
File diff suppressed because it is too large
Load Diff
|
@ -105,18 +105,6 @@ message DeprecatedTableState {
|
|||
required State state = 1 [default = ENABLED];
|
||||
}
|
||||
|
||||
/**
|
||||
* Metadata associated with a table lock in zookeeper
|
||||
*/
|
||||
message TableLock {
|
||||
optional TableName table_name = 1;
|
||||
optional ServerName lock_owner = 2;
|
||||
optional int64 thread_id = 3;
|
||||
optional bool is_shared = 4;
|
||||
optional string purpose = 5;
|
||||
optional int64 create_time = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
* State of the switch.
|
||||
*/
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -143,18 +143,6 @@ message ReplicationHLogPosition {
|
|||
required int64 position = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Metadata associated with a table lock in zookeeper
|
||||
*/
|
||||
message TableLock {
|
||||
optional TableName table_name = 1;
|
||||
optional ServerName lock_owner = 2;
|
||||
optional int64 thread_id = 3;
|
||||
optional bool is_shared = 4;
|
||||
optional string purpose = 5;
|
||||
optional int64 create_time = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
* State of the switch.
|
||||
*/
|
||||
|
|
|
@ -1,105 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* An interface for an application-specific lock.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface InterProcessLock {
|
||||
|
||||
/**
|
||||
* Acquire the lock, waiting indefinitely until the lock is released or
|
||||
* the thread is interrupted.
|
||||
* @throws IOException If there is an unrecoverable error releasing the lock
|
||||
* @throws InterruptedException If current thread is interrupted while
|
||||
* waiting for the lock
|
||||
*/
|
||||
void acquire() throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Acquire the lock within a wait time.
|
||||
* @param timeoutMs The maximum time (in milliseconds) to wait for the lock,
|
||||
* -1 to wait indefinitely
|
||||
* @return True if the lock was acquired, false if waiting time elapsed
|
||||
* before the lock was acquired
|
||||
* @throws IOException If there is an unrecoverable error talking talking
|
||||
* (e.g., when talking to a lock service) when acquiring
|
||||
* the lock
|
||||
* @throws InterruptedException If the thread is interrupted while waiting to
|
||||
* acquire the lock
|
||||
*/
|
||||
boolean tryAcquire(long timeoutMs)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Release the lock.
|
||||
* @throws IOException If there is an unrecoverable error releasing the lock
|
||||
* @throws InterruptedException If the thread is interrupted while releasing
|
||||
* the lock
|
||||
*/
|
||||
void release() throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* If supported, attempts to reap all the locks of this type by forcefully
|
||||
* deleting the locks (both held and attempted) that have expired according
|
||||
* to the given timeout. Lock reaping is different than coordinated lock revocation
|
||||
* in that, there is no coordination, and the behavior is undefined if the
|
||||
* lock holder is still alive.
|
||||
* @throws IOException If there is an unrecoverable error reaping the locks
|
||||
*/
|
||||
void reapExpiredLocks(long expireTimeoutMs) throws IOException;
|
||||
|
||||
/**
|
||||
* If supported, attempts to reap all the locks of this type by forcefully
|
||||
* deleting the locks (both held and attempted). Lock reaping is different
|
||||
* than coordinated lock revocation in that, there is no coordination, and
|
||||
* the behavior is undefined if the lock holder is still alive.
|
||||
* Calling this should have the same affect as calling {@link #reapExpiredLocks(long)}
|
||||
* with timeout=0.
|
||||
* @throws IOException If there is an unrecoverable error reaping the locks
|
||||
*/
|
||||
void reapAllLocks() throws IOException;
|
||||
|
||||
/**
|
||||
* An interface for objects that process lock metadata.
|
||||
*/
|
||||
interface MetadataHandler {
|
||||
|
||||
/**
|
||||
* Called after lock metadata is successfully read from a distributed
|
||||
* lock service. This method may contain any procedures for, e.g.,
|
||||
* printing the metadata in a humanly-readable format.
|
||||
* @param metadata The metadata
|
||||
*/
|
||||
void handleMetadata(byte[] metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Visits the locks (both held and attempted) of this type with the given
|
||||
* {@link MetadataHandler}.
|
||||
* @throws IOException If there is an unrecoverable error
|
||||
*/
|
||||
void visitLocks(MetadataHandler handler) throws IOException;
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* An interface for a distributed reader-writer lock.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface InterProcessReadWriteLock {
|
||||
|
||||
/**
|
||||
* Obtain a read lock containing given metadata.
|
||||
* @param metadata Serialized lock metadata (this may contain information
|
||||
* such as the process owning the lock or the purpose for
|
||||
* which the lock was acquired).
|
||||
* @return An instantiated InterProcessLock instance
|
||||
*/
|
||||
InterProcessLock readLock(byte[] metadata);
|
||||
|
||||
/**
|
||||
* Obtain a write lock containing given metadata.
|
||||
* @param metadata Serialized lock metadata (this may contain information
|
||||
* such as the process owning the lock or the purpose for
|
||||
* which the lock was acquired).
|
||||
* @return An instantiated InterProcessLock instance
|
||||
*/
|
||||
InterProcessLock writeLock(byte[] metadata);
|
||||
}
|
|
@ -1,459 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper.lock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.InterProcessLock;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.DeletionListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.BadVersionException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* ZooKeeper based HLock implementation. Based on the Shared Locks recipe.
|
||||
* (see:
|
||||
* <a href="http://zookeeper.apache.org/doc/trunk/recipes.html">
|
||||
* ZooKeeper Recipes and Solutions
|
||||
* </a>)
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ZKInterProcessLockBase.class);
|
||||
|
||||
/** ZNode prefix used by processes acquiring reader locks */
|
||||
protected static final String READ_LOCK_CHILD_NODE_PREFIX = "read-";
|
||||
|
||||
/** ZNode prefix used by processes acquiring writer locks */
|
||||
protected static final String WRITE_LOCK_CHILD_NODE_PREFIX = "write-";
|
||||
|
||||
protected final ZooKeeperWatcher zkWatcher;
|
||||
protected final String parentLockNode;
|
||||
protected final String fullyQualifiedZNode;
|
||||
protected final String childZNode;
|
||||
protected final byte[] metadata;
|
||||
protected final MetadataHandler handler;
|
||||
|
||||
// If we acquire a lock, update this field
|
||||
protected final AtomicReference<AcquiredLock> acquiredLock =
|
||||
new AtomicReference<AcquiredLock>(null);
|
||||
|
||||
/**
|
||||
* Represents information about a lock held by this thread.
|
||||
*/
|
||||
protected static class AcquiredLock {
|
||||
private final String path;
|
||||
private final int version;
|
||||
|
||||
/**
|
||||
* Store information about a lock.
|
||||
* @param path The path to a lock's ZNode
|
||||
* @param version The current version of the lock's ZNode
|
||||
*/
|
||||
public AcquiredLock(String path, int version) {
|
||||
this.path = path;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public int getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AcquiredLockInfo{" +
|
||||
"path='" + path + '\'' +
|
||||
", version=" + version +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
protected static class ZNodeComparator implements Comparator<String> {
|
||||
|
||||
public static final ZNodeComparator COMPARATOR = new ZNodeComparator();
|
||||
|
||||
private ZNodeComparator() {
|
||||
}
|
||||
|
||||
/** Parses sequenceId from the znode name. ZooKeeper documentation
|
||||
* states: The sequence number is always fixed length of 10 digits, 0 padded
|
||||
*/
|
||||
public static long getChildSequenceId(String childZNode) {
|
||||
Preconditions.checkNotNull(childZNode);
|
||||
assert childZNode.length() >= 10;
|
||||
String sequenceIdStr = childZNode.substring(childZNode.length() - 10);
|
||||
return Long.parseLong(sequenceIdStr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(String zNode1, String zNode2) {
|
||||
long seq1 = getChildSequenceId(zNode1);
|
||||
long seq2 = getChildSequenceId(zNode2);
|
||||
if (seq1 == seq2) {
|
||||
return 0;
|
||||
} else {
|
||||
return seq1 < seq2 ? -1 : 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by implementing classes.
|
||||
* @param zkWatcher
|
||||
* @param parentLockNode The lock ZNode path
|
||||
* @param metadata
|
||||
* @param handler
|
||||
* @param childNode The prefix for child nodes created under the parent
|
||||
*/
|
||||
protected ZKInterProcessLockBase(ZooKeeperWatcher zkWatcher,
|
||||
String parentLockNode, byte[] metadata, MetadataHandler handler, String childNode) {
|
||||
this.zkWatcher = zkWatcher;
|
||||
this.parentLockNode = parentLockNode;
|
||||
this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode);
|
||||
this.metadata = metadata;
|
||||
this.handler = handler;
|
||||
this.childZNode = childNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void acquire() throws IOException, InterruptedException {
|
||||
tryAcquire(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean tryAcquire(long timeoutMs)
|
||||
throws IOException, InterruptedException {
|
||||
boolean hasTimeout = timeoutMs != -1;
|
||||
long waitUntilMs =
|
||||
hasTimeout ?EnvironmentEdgeManager.currentTime() + timeoutMs : -1;
|
||||
String createdZNode;
|
||||
try {
|
||||
createdZNode = createLockZNode();
|
||||
} catch (KeeperException ex) {
|
||||
throw new IOException("Failed to create znode: " + fullyQualifiedZNode, ex);
|
||||
}
|
||||
while (true) {
|
||||
List<String> children;
|
||||
try {
|
||||
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
String pathToWatch;
|
||||
if ((pathToWatch = getLockPath(createdZNode, children)) == null) {
|
||||
break;
|
||||
}
|
||||
CountDownLatch deletedLatch = new CountDownLatch(1);
|
||||
String zkPathToWatch =
|
||||
ZKUtil.joinZNode(parentLockNode, pathToWatch);
|
||||
DeletionListener deletionListener =
|
||||
new DeletionListener(zkWatcher, zkPathToWatch, deletedLatch);
|
||||
zkWatcher.registerListener(deletionListener);
|
||||
try {
|
||||
if (ZKUtil.setWatchIfNodeExists(zkWatcher, zkPathToWatch)) {
|
||||
// Wait for the watcher to fire
|
||||
if (hasTimeout) {
|
||||
long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTime();
|
||||
if (remainingMs < 0 ||
|
||||
!deletedLatch.await(remainingMs, TimeUnit.MILLISECONDS)) {
|
||||
LOG.warn("Unable to acquire the lock in " + timeoutMs +
|
||||
" milliseconds.");
|
||||
try {
|
||||
ZKUtil.deleteNode(zkWatcher, createdZNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Unable to remove ZNode " + createdZNode);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
deletedLatch.await();
|
||||
}
|
||||
if (deletionListener.hasException()) {
|
||||
Throwable t = deletionListener.getException();
|
||||
throw new IOException("Exception in the watcher", t);
|
||||
}
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
} finally {
|
||||
zkWatcher.unregisterListener(deletionListener);
|
||||
}
|
||||
}
|
||||
updateAcquiredLock(createdZNode);
|
||||
LOG.debug("Acquired a lock for " + createdZNode);
|
||||
return true;
|
||||
}
|
||||
|
||||
private String createLockZNode() throws KeeperException {
|
||||
try {
|
||||
return ZKUtil.createNodeIfNotExistsNoWatch(zkWatcher, fullyQualifiedZNode,
|
||||
metadata, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
//create parents, retry
|
||||
ZKUtil.createWithParents(zkWatcher, parentLockNode);
|
||||
return createLockZNode();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a child znode represents a read lock.
|
||||
* @param child The child znode we want to check.
|
||||
* @return whether the child znode represents a read lock
|
||||
*/
|
||||
protected static boolean isChildReadLock(String child) {
|
||||
int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
|
||||
String suffix = child.substring(idx + 1);
|
||||
return suffix.startsWith(READ_LOCK_CHILD_NODE_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a child znode represents a write lock.
|
||||
* @param child The child znode we want to check.
|
||||
* @return whether the child znode represents a write lock
|
||||
*/
|
||||
protected static boolean isChildWriteLock(String child) {
|
||||
int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
|
||||
String suffix = child.substring(idx + 1);
|
||||
return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a child znode represents the same type(read or write) of lock
|
||||
* @param child The child znode we want to check.
|
||||
* @return whether the child znode represents the same type(read or write) of lock
|
||||
*/
|
||||
protected boolean isChildOfSameType(String child) {
|
||||
int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
|
||||
String suffix = child.substring(idx + 1);
|
||||
return suffix.startsWith(this.childZNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update state as to indicate that a lock is held
|
||||
* @param createdZNode The lock znode
|
||||
* @throws IOException If an unrecoverable ZooKeeper error occurs
|
||||
*/
|
||||
protected void updateAcquiredLock(String createdZNode) throws IOException {
|
||||
Stat stat = new Stat();
|
||||
byte[] data = null;
|
||||
Exception ex = null;
|
||||
try {
|
||||
data = ZKUtil.getDataNoWatch(zkWatcher, createdZNode, stat);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Cannot getData for znode:" + createdZNode, e);
|
||||
ex = e;
|
||||
}
|
||||
if (data == null) {
|
||||
LOG.error("Can't acquire a lock on a non-existent node " + createdZNode);
|
||||
throw new IllegalStateException("ZNode " + createdZNode +
|
||||
"no longer exists!", ex);
|
||||
}
|
||||
AcquiredLock newLock = new AcquiredLock(createdZNode, stat.getVersion());
|
||||
if (!acquiredLock.compareAndSet(null, newLock)) {
|
||||
LOG.error("The lock " + fullyQualifiedZNode +
|
||||
" has already been acquired by another process!");
|
||||
throw new IllegalStateException(fullyQualifiedZNode +
|
||||
" is held by another process");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void release() throws IOException, InterruptedException {
|
||||
AcquiredLock lock = acquiredLock.get();
|
||||
if (lock == null) {
|
||||
LOG.error("Cannot release lock" +
|
||||
", process does not have a lock for " + fullyQualifiedZNode);
|
||||
throw new IllegalStateException("No lock held for " + fullyQualifiedZNode);
|
||||
}
|
||||
try {
|
||||
if (ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
|
||||
boolean ret = ZKUtil.deleteNode(zkWatcher, lock.getPath(), lock.getVersion());
|
||||
if (!ret && ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
|
||||
throw new IllegalStateException("Couldn't delete " + lock.getPath());
|
||||
}
|
||||
if (!acquiredLock.compareAndSet(lock, null)) {
|
||||
LOG.debug("Current process no longer holds " + lock + " for " +
|
||||
fullyQualifiedZNode);
|
||||
throw new IllegalStateException("Not holding a lock for " +
|
||||
fullyQualifiedZNode +"!");
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Released " + lock.getPath());
|
||||
}
|
||||
} catch (BadVersionException e) {
|
||||
throw new IllegalStateException(e);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process metadata stored in a ZNode using a callback
|
||||
* <p>
|
||||
* @param lockZNode The node holding the metadata
|
||||
* @return True if metadata was ready and processed, false otherwise.
|
||||
*/
|
||||
protected boolean handleLockMetadata(String lockZNode) {
|
||||
return handleLockMetadata(lockZNode, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process metadata stored in a ZNode using a callback object passed to
|
||||
* this instance.
|
||||
* <p>
|
||||
* @param lockZNode The node holding the metadata
|
||||
* @param handler the metadata handler
|
||||
* @return True if metadata was ready and processed, false on exception.
|
||||
*/
|
||||
protected boolean handleLockMetadata(String lockZNode, MetadataHandler handler) {
|
||||
if (handler == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
byte[] metadata = ZKUtil.getData(zkWatcher, lockZNode);
|
||||
handler.handleMetadata(metadata);
|
||||
} catch (KeeperException ex) {
|
||||
LOG.warn("Error processing lock metadata in " + lockZNode);
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("InterruptedException processing lock metadata in " + lockZNode);
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reapAllLocks() throws IOException {
|
||||
reapExpiredLocks(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will delete all lock znodes of this type (either read or write) which are "expired"
|
||||
* according to timeout. Assumption is that the clock skew between zookeeper and this servers
|
||||
* is negligible.
|
||||
* Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams".
|
||||
* (http://zookeeper.apache.org/doc/trunk/recipes.html).
|
||||
*/
|
||||
public void reapExpiredLocks(long timeout) throws IOException {
|
||||
List<String> children;
|
||||
try {
|
||||
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
if (children == null) return;
|
||||
|
||||
KeeperException deferred = null;
|
||||
Stat stat = new Stat();
|
||||
long expireDate = System.currentTimeMillis() - timeout; //we are using cTime in zookeeper
|
||||
for (String child : children) {
|
||||
if (isChildOfSameType(child)) {
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, child);
|
||||
try {
|
||||
ZKUtil.getDataNoWatch(zkWatcher, znode, stat);
|
||||
if (stat.getCtime() < expireDate) {
|
||||
LOG.info("Reaping lock for znode:" + znode);
|
||||
ZKUtil.deleteNodeFailSilent(zkWatcher, znode);
|
||||
}
|
||||
} catch (KeeperException ex) {
|
||||
LOG.warn("Error reaping the znode for write lock :" + znode);
|
||||
deferred = ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (deferred != null) {
|
||||
throw new IOException("ZK exception while reaping locks:", deferred);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Visits the locks (both held and attempted) with the given MetadataHandler.
|
||||
* @throws IOException If there is an unrecoverable error
|
||||
*/
|
||||
public void visitLocks(MetadataHandler handler) throws IOException {
|
||||
List<String> children;
|
||||
try {
|
||||
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
if (children != null && children.size() > 0) {
|
||||
for (String child : children) {
|
||||
if (isChildOfSameType(child)) {
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, child);
|
||||
String childWatchesZNode = getLockPath(child, children);
|
||||
if (childWatchesZNode == null) {
|
||||
LOG.info("Lock is held by: " + child);
|
||||
}
|
||||
handleLockMetadata(znode, handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine based on a list of children under a ZNode, whether or not a
|
||||
* process which created a specified ZNode has obtained a lock. If a lock is
|
||||
* not obtained, return the path that we should watch awaiting its deletion.
|
||||
* Otherwise, return null.
|
||||
* This method is abstract as the logic for determining whether or not a
|
||||
* lock is obtained depends on the type of lock being implemented.
|
||||
* @param myZNode The ZNode created by the process attempting to acquire
|
||||
* a lock
|
||||
* @param children List of all child ZNodes under the lock's parent ZNode
|
||||
* @return The path to watch, or null if myZNode can represent a correctly
|
||||
* acquired lock.
|
||||
*/
|
||||
protected abstract String getLockPath(String myZNode, List<String> children)
|
||||
throws IOException;
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper.lock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
/**
|
||||
* ZooKeeper based read lock: does not exclude other read locks, but excludes
|
||||
* and is excluded by write locks.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZKInterProcessReadLock extends ZKInterProcessLockBase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ZKInterProcessReadLock.class);
|
||||
|
||||
public ZKInterProcessReadLock(ZooKeeperWatcher zooKeeperWatcher,
|
||||
String znode, byte[] metadata, MetadataHandler handler) {
|
||||
super(zooKeeperWatcher, znode, metadata, handler, READ_LOCK_CHILD_NODE_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected String getLockPath(String createdZNode, List<String> children) throws IOException {
|
||||
TreeSet<String> writeChildren =
|
||||
new TreeSet<String>(ZNodeComparator.COMPARATOR);
|
||||
for (String child : children) {
|
||||
if (isChildWriteLock(child)) {
|
||||
writeChildren.add(child);
|
||||
}
|
||||
}
|
||||
if (writeChildren.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
SortedSet<String> lowerChildren = writeChildren.headSet(createdZNode);
|
||||
if (lowerChildren.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
String pathToWatch = lowerChildren.last();
|
||||
String nodeHoldingLock = lowerChildren.first();
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock);
|
||||
handleLockMetadata(znode);
|
||||
|
||||
return pathToWatch;
|
||||
}
|
||||
}
|
|
@ -1,66 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper.lock;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
|
||||
import org.apache.hadoop.hbase.InterProcessReadWriteLock;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
/**
|
||||
* ZooKeeper based implementation of {@link InterProcessReadWriteLock}. This lock is fair,
|
||||
* not reentrant, and not revocable.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZKInterProcessReadWriteLock implements InterProcessReadWriteLock {
|
||||
|
||||
private final ZooKeeperWatcher zkWatcher;
|
||||
private final String znode;
|
||||
private final MetadataHandler handler;
|
||||
|
||||
/**
|
||||
* Creates a DistributedReadWriteLock instance.
|
||||
* @param zkWatcher
|
||||
* @param znode ZNode path for the lock
|
||||
* @param handler An object that will handle de-serializing and processing
|
||||
* the metadata associated with reader or writer locks
|
||||
* created by this object or null if none desired.
|
||||
*/
|
||||
public ZKInterProcessReadWriteLock(ZooKeeperWatcher zkWatcher, String znode,
|
||||
MetadataHandler handler) {
|
||||
this.zkWatcher = zkWatcher;
|
||||
this.znode = znode;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public ZKInterProcessReadLock readLock(byte[] metadata) {
|
||||
return new ZKInterProcessReadLock(zkWatcher, znode, metadata, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public ZKInterProcessWriteLock writeLock(byte[] metadata) {
|
||||
return new ZKInterProcessWriteLock(zkWatcher, znode, metadata, handler);
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper.lock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
/**
|
||||
* ZooKeeper based write lock:
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZKInterProcessWriteLock extends ZKInterProcessLockBase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ZKInterProcessWriteLock.class);
|
||||
|
||||
public ZKInterProcessWriteLock(ZooKeeperWatcher zooKeeperWatcher,
|
||||
String znode, byte[] metadata, MetadataHandler handler) {
|
||||
super(zooKeeperWatcher, znode, metadata, handler, WRITE_LOCK_CHILD_NODE_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected String getLockPath(String createdZNode, List<String> children) throws IOException {
|
||||
TreeSet<String> sortedChildren =
|
||||
new TreeSet<String>(ZNodeComparator.COMPARATOR);
|
||||
sortedChildren.addAll(children);
|
||||
String pathToWatch = sortedChildren.lower(createdZNode);
|
||||
if (pathToWatch != null) {
|
||||
String nodeHoldingLock = sortedChildren.first();
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock);
|
||||
handleLockMetadata(znode);
|
||||
}
|
||||
return pathToWatch;
|
||||
}
|
||||
}
|
|
@ -1,360 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper.lock;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.InterProcessLock;
|
||||
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category({MiscTests.class, MediumTests.class})
|
||||
public class TestZKInterProcessReadWriteLock {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestZKInterProcessReadWriteLock.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private static final int NUM_THREADS = 10;
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
private final AtomicBoolean isLockHeld = new AtomicBoolean(false);
|
||||
private final ExecutorService executor =
|
||||
Executors.newFixedThreadPool(NUM_THREADS,
|
||||
new DaemonThreadFactory("TestZKInterProcessReadWriteLock-"));
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
|
||||
ZooKeeperWatcher zkw = getZooKeeperWatcher("setup");
|
||||
ZKUtil.createWithParents(zkw, zkw.znodePaths.tableLockZNode);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterAllTests() throws Exception {
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
private static ZooKeeperWatcher getZooKeeperWatcher(String desc)
|
||||
throws IOException {
|
||||
return TEST_UTIL.getZooKeeperWatcher();
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testWriteLockExcludesWriters() throws Exception {
|
||||
final String testName = "testWriteLockExcludesWriters";
|
||||
final ZKInterProcessReadWriteLock readWriteLock =
|
||||
getReadWriteLock(testName);
|
||||
List<Future<Void>> results = Lists.newArrayList();
|
||||
for (int i = 0; i < NUM_THREADS; ++i) {
|
||||
final String threadDesc = testName + i;
|
||||
results.add(executor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
ZKInterProcessWriteLock writeLock =
|
||||
readWriteLock.writeLock(Bytes.toBytes(threadDesc));
|
||||
try {
|
||||
writeLock.acquire();
|
||||
try {
|
||||
// No one else should hold the lock
|
||||
assertTrue(isLockHeld.compareAndSet(false, true));
|
||||
Thread.sleep(1000);
|
||||
// No one else should have released the lock
|
||||
assertTrue(isLockHeld.compareAndSet(true, false));
|
||||
} finally {
|
||||
isLockHeld.set(false);
|
||||
writeLock.release();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn(threadDesc + " interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
|
||||
}
|
||||
MultithreadedTestUtil.assertOnFutures(results);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testReadLockDoesNotExcludeReaders() throws Exception {
|
||||
final String testName = "testReadLockDoesNotExcludeReaders";
|
||||
final ZKInterProcessReadWriteLock readWriteLock =
|
||||
getReadWriteLock(testName);
|
||||
final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS);
|
||||
final AtomicInteger locksHeld = new AtomicInteger(0);
|
||||
List<Future<Void>> results = Lists.newArrayList();
|
||||
for (int i = 0; i < NUM_THREADS; ++i) {
|
||||
final String threadDesc = testName + i;
|
||||
results.add(executor.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
ZKInterProcessReadLock readLock =
|
||||
readWriteLock.readLock(Bytes.toBytes(threadDesc));
|
||||
readLock.acquire();
|
||||
try {
|
||||
locksHeld.incrementAndGet();
|
||||
locksAcquiredLatch.countDown();
|
||||
Thread.sleep(1000);
|
||||
} finally {
|
||||
readLock.release();
|
||||
locksHeld.decrementAndGet();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
}
|
||||
locksAcquiredLatch.await();
|
||||
assertEquals(locksHeld.get(), NUM_THREADS);
|
||||
MultithreadedTestUtil.assertOnFutures(results);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testReadLockExcludesWriters() throws Exception {
|
||||
// Submit a read lock request first
|
||||
// Submit a write lock request second
|
||||
final String testName = "testReadLockExcludesWriters";
|
||||
List<Future<Void>> results = Lists.newArrayList();
|
||||
final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1);
|
||||
Callable<Void> acquireReadLock = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-acquireReadLock";
|
||||
ZKInterProcessReadLock readLock =
|
||||
getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
|
||||
readLock.acquire();
|
||||
try {
|
||||
assertTrue(isLockHeld.compareAndSet(false, true));
|
||||
readLockAcquiredLatch.countDown();
|
||||
Thread.sleep(1000);
|
||||
} finally {
|
||||
isLockHeld.set(false);
|
||||
readLock.release();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Callable<Void> acquireWriteLock = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-acquireWriteLock";
|
||||
ZKInterProcessWriteLock writeLock =
|
||||
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
|
||||
readLockAcquiredLatch.await();
|
||||
assertTrue(isLockHeld.get());
|
||||
writeLock.acquire();
|
||||
try {
|
||||
assertFalse(isLockHeld.get());
|
||||
} finally {
|
||||
writeLock.release();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
results.add(executor.submit(acquireReadLock));
|
||||
results.add(executor.submit(acquireWriteLock));
|
||||
MultithreadedTestUtil.assertOnFutures(results);
|
||||
}
|
||||
|
||||
private static ZKInterProcessReadWriteLock getReadWriteLock(String testName)
|
||||
throws IOException {
|
||||
MetadataHandler handler = new MetadataHandler() {
|
||||
@Override
|
||||
public void handleMetadata(byte[] ownerMetadata) {
|
||||
LOG.info("Lock info: " + Bytes.toString(ownerMetadata));
|
||||
}
|
||||
};
|
||||
ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName);
|
||||
String znode = ZKUtil.joinZNode(zkWatcher.znodePaths.tableLockZNode, testName);
|
||||
|
||||
return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testWriteLockExcludesReaders() throws Exception {
|
||||
// Submit a read lock request first
|
||||
// Submit a write lock request second
|
||||
final String testName = "testReadLockExcludesWriters";
|
||||
List<Future<Void>> results = Lists.newArrayList();
|
||||
final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
|
||||
Callable<Void> acquireWriteLock = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-acquireWriteLock";
|
||||
ZKInterProcessWriteLock writeLock =
|
||||
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
|
||||
writeLock.acquire();
|
||||
try {
|
||||
writeLockAcquiredLatch.countDown();
|
||||
assertTrue(isLockHeld.compareAndSet(false, true));
|
||||
Thread.sleep(1000);
|
||||
} finally {
|
||||
isLockHeld.set(false);
|
||||
writeLock.release();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Callable<Void> acquireReadLock = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-acquireReadLock";
|
||||
ZKInterProcessReadLock readLock =
|
||||
getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
|
||||
writeLockAcquiredLatch.await();
|
||||
readLock.acquire();
|
||||
try {
|
||||
assertFalse(isLockHeld.get());
|
||||
} finally {
|
||||
readLock.release();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
results.add(executor.submit(acquireWriteLock));
|
||||
results.add(executor.submit(acquireReadLock));
|
||||
MultithreadedTestUtil.assertOnFutures(results);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTimeout() throws Exception {
|
||||
final String testName = "testTimeout";
|
||||
final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
|
||||
Callable<Void> shouldHog = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-shouldHog";
|
||||
ZKInterProcessWriteLock lock =
|
||||
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
|
||||
lock.acquire();
|
||||
lockAcquiredLatch.countDown();
|
||||
Thread.sleep(10000);
|
||||
lock.release();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Callable<Void> shouldTimeout = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-shouldTimeout";
|
||||
ZKInterProcessWriteLock lock =
|
||||
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
|
||||
lockAcquiredLatch.await();
|
||||
assertFalse(lock.tryAcquire(5000));
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Callable<Void> shouldAcquireLock = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final String threadDesc = testName + "-shouldAcquireLock";
|
||||
ZKInterProcessWriteLock lock =
|
||||
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
|
||||
lockAcquiredLatch.await();
|
||||
assertTrue(lock.tryAcquire(30000));
|
||||
lock.release();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
List<Future<Void>> results = Lists.newArrayList();
|
||||
results.add(executor.submit(shouldHog));
|
||||
results.add(executor.submit(shouldTimeout));
|
||||
results.add(executor.submit(shouldAcquireLock));
|
||||
MultithreadedTestUtil.assertOnFutures(results);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMultipleClients() throws Exception {
|
||||
//tests lock usage from multiple zookeeper clients with different sessions.
|
||||
//acquire one read lock, then one write lock
|
||||
final String testName = "testMultipleClients";
|
||||
|
||||
//different zookeeper sessions with separate identifiers
|
||||
ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null);
|
||||
ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null);
|
||||
|
||||
String znode = ZKUtil.joinZNode(zkWatcher1.znodePaths.tableLockZNode, testName);
|
||||
|
||||
ZKInterProcessReadWriteLock clientLock1
|
||||
= new ZKInterProcessReadWriteLock(zkWatcher1, znode, null);
|
||||
ZKInterProcessReadWriteLock clientLock2
|
||||
= new ZKInterProcessReadWriteLock(zkWatcher2, znode, null);
|
||||
|
||||
InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1"));
|
||||
lock1.acquire();
|
||||
|
||||
//try to acquire, but it will timeout. We are testing whether this will cause any problems
|
||||
//due to the read lock being from another client
|
||||
InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2"));
|
||||
assertFalse(lock2.tryAcquire(1000));
|
||||
|
||||
lock1.release();
|
||||
|
||||
//this time it will acquire
|
||||
assertTrue(lock2.tryAcquire(5000));
|
||||
lock2.release();
|
||||
zkWatcher1.close();
|
||||
zkWatcher2.close();
|
||||
}
|
||||
}
|
4
pom.xml
4
pom.xml
|
@ -3076,7 +3076,7 @@
|
|||
<name>User API</name>
|
||||
<description>The HBase Application Programmer's API</description>
|
||||
<excludePackageNames>
|
||||
org.apache.hadoop.hbase.backup*:org.apache.hadoop.hbase.catalog:org.apache.hadoop.hbase.client.coprocessor:org.apache.hadoop.hbase.client.metrics:org.apache.hadoop.hbase.codec*:org.apache.hadoop.hbase.constraint:org.apache.hadoop.hbase.coprocessor.*:org.apache.hadoop.hbase.executor:org.apache.hadoop.hbase.fs:*.generated.*:org.apache.hadoop.hbase.io.hfile.*:org.apache.hadoop.hbase.mapreduce.hadoopbackport:org.apache.hadoop.hbase.mapreduce.replication:org.apache.hadoop.hbase.master.*:org.apache.hadoop.hbase.metrics*:org.apache.hadoop.hbase.migration:org.apache.hadoop.hbase.monitoring:org.apache.hadoop.hbase.p*:org.apache.hadoop.hbase.regionserver.compactions:org.apache.hadoop.hbase.regionserver.handler:org.apache.hadoop.hbase.regionserver.snapshot:org.apache.hadoop.hbase.replication.*:org.apache.hadoop.hbase.rest.filter:org.apache.hadoop.hbase.rest.model:org.apache.hadoop.hbase.rest.p*:org.apache.hadoop.hbase.security.*:org.apache.hadoop.hbase.thrift*:org.apache.hadoop.hbase.tmpl.*:org.apache.hadoop.hbase.tool:org.apache.hadoop.hbase.trace:org.apache.hadoop.hbase.util.byterange*:org.apache.hadoop.hbase.util.test:org.apache.hadoop.hbase.util.vint:org.apache.hadoop.hbase.zookeeper.lock:org.apache.hadoop.metrics2*:org.apache.hadoop.hbase.io.compress*
|
||||
org.apache.hadoop.hbase.backup*:org.apache.hadoop.hbase.catalog:org.apache.hadoop.hbase.client.coprocessor:org.apache.hadoop.hbase.client.metrics:org.apache.hadoop.hbase.codec*:org.apache.hadoop.hbase.constraint:org.apache.hadoop.hbase.coprocessor.*:org.apache.hadoop.hbase.executor:org.apache.hadoop.hbase.fs:*.generated.*:org.apache.hadoop.hbase.io.hfile.*:org.apache.hadoop.hbase.mapreduce.hadoopbackport:org.apache.hadoop.hbase.mapreduce.replication:org.apache.hadoop.hbase.master.*:org.apache.hadoop.hbase.metrics*:org.apache.hadoop.hbase.migration:org.apache.hadoop.hbase.monitoring:org.apache.hadoop.hbase.p*:org.apache.hadoop.hbase.regionserver.compactions:org.apache.hadoop.hbase.regionserver.handler:org.apache.hadoop.hbase.regionserver.snapshot:org.apache.hadoop.hbase.replication.*:org.apache.hadoop.hbase.rest.filter:org.apache.hadoop.hbase.rest.model:org.apache.hadoop.hbase.rest.p*:org.apache.hadoop.hbase.security.*:org.apache.hadoop.hbase.thrift*:org.apache.hadoop.hbase.tmpl.*:org.apache.hadoop.hbase.tool:org.apache.hadoop.hbase.trace:org.apache.hadoop.hbase.util.byterange*:org.apache.hadoop.hbase.util.test:org.apache.hadoop.hbase.util.vint:org.apache.hadoop.metrics2*:org.apache.hadoop.hbase.io.compress*
|
||||
</excludePackageNames>
|
||||
<!-- switch on dependency-driven aggregation -->
|
||||
<includeDependencySources>false</includeDependencySources>
|
||||
|
@ -3133,7 +3133,7 @@
|
|||
<name>User API</name>
|
||||
<description>The HBase Application Programmer's API</description>
|
||||
<excludePackageNames>
|
||||
org.apache.hadoop.hbase.backup*:org.apache.hadoop.hbase.catalog:org.apache.hadoop.hbase.client.coprocessor:org.apache.hadoop.hbase.client.metrics:org.apache.hadoop.hbase.codec*:org.apache.hadoop.hbase.constraint:org.apache.hadoop.hbase.coprocessor.*:org.apache.hadoop.hbase.executor:org.apache.hadoop.hbase.fs:*.generated.*:org.apache.hadoop.hbase.io.hfile.*:org.apache.hadoop.hbase.mapreduce.hadoopbackport:org.apache.hadoop.hbase.mapreduce.replication:org.apache.hadoop.hbase.master.*:org.apache.hadoop.hbase.metrics*:org.apache.hadoop.hbase.migration:org.apache.hadoop.hbase.monitoring:org.apache.hadoop.hbase.p*:org.apache.hadoop.hbase.regionserver.compactions:org.apache.hadoop.hbase.regionserver.handler:org.apache.hadoop.hbase.regionserver.snapshot:org.apache.hadoop.hbase.replication.*:org.apache.hadoop.hbase.rest.filter:org.apache.hadoop.hbase.rest.model:org.apache.hadoop.hbase.rest.p*:org.apache.hadoop.hbase.security.*:org.apache.hadoop.hbase.thrift*:org.apache.hadoop.hbase.tmpl.*:org.apache.hadoop.hbase.tool:org.apache.hadoop.hbase.trace:org.apache.hadoop.hbase.util.byterange*:org.apache.hadoop.hbase.util.test:org.apache.hadoop.hbase.util.vint:org.apache.hadoop.hbase.zookeeper.lock:org.apache.hadoop.metrics2*:org.apache.hadoop.hbase.io.compress*
|
||||
org.apache.hadoop.hbase.backup*:org.apache.hadoop.hbase.catalog:org.apache.hadoop.hbase.client.coprocessor:org.apache.hadoop.hbase.client.metrics:org.apache.hadoop.hbase.codec*:org.apache.hadoop.hbase.constraint:org.apache.hadoop.hbase.coprocessor.*:org.apache.hadoop.hbase.executor:org.apache.hadoop.hbase.fs:*.generated.*:org.apache.hadoop.hbase.io.hfile.*:org.apache.hadoop.hbase.mapreduce.hadoopbackport:org.apache.hadoop.hbase.mapreduce.replication:org.apache.hadoop.hbase.master.*:org.apache.hadoop.hbase.metrics*:org.apache.hadoop.hbase.migration:org.apache.hadoop.hbase.monitoring:org.apache.hadoop.hbase.p*:org.apache.hadoop.hbase.regionserver.compactions:org.apache.hadoop.hbase.regionserver.handler:org.apache.hadoop.hbase.regionserver.snapshot:org.apache.hadoop.hbase.replication.*:org.apache.hadoop.hbase.rest.filter:org.apache.hadoop.hbase.rest.model:org.apache.hadoop.hbase.rest.p*:org.apache.hadoop.hbase.security.*:org.apache.hadoop.hbase.thrift*:org.apache.hadoop.hbase.tmpl.*:org.apache.hadoop.hbase.tool:org.apache.hadoop.hbase.trace:org.apache.hadoop.hbase.util.byterange*:org.apache.hadoop.hbase.util.test:org.apache.hadoop.hbase.util.vint:org.apache.hadoop.metrics2*:org.apache.hadoop.hbase.io.compress*
|
||||
</excludePackageNames>
|
||||
<!-- switch on dependency-driven aggregation -->
|
||||
<includeDependencySources>false</includeDependencySources>
|
||||
|
|
Loading…
Reference in New Issue